================================================================================ File: hikigai/appsdk/__init__.py ================================================================================ """ hikigai-appsdk: Python SDK for invoking AI agents. Invoke deployed agents in your applications. """ __version__ = "0.0.1" from hikigai.appsdk.client import AppClient from hikigai.appsdk.models import ( RuntimeAgent, InvokeResponse, StreamChunk, ) # Re-export common exceptions from core from hikigai.core.exceptions import ( HikigaiError, AuthenticationError, RateLimitError, AgentNotFoundError, InvocationError, ConfigurationError, ) __all__ = [ "__version__", # Client "AppClient", # Models "RuntimeAgent", "InvokeResponse", "StreamChunk", # Exceptions "HikigaiError", "AuthenticationError", "RateLimitError", "AgentNotFoundError", "InvocationError", "ConfigurationError", ] ================================================================================ File: hikigai/appsdk/client.py ================================================================================ """ AppClient: Invoke AI agents in your applications. This is the main client for application developers to call deployed agents. """ import os import logging import uuid from typing import Optional, Dict, Any, List, Iterator, Union from datetime import datetime from hikigai.core.api.client import APIClient from hikigai.core.exceptions import ( ConfigurationError, AgentNotFoundError, InvocationError, ) from hikigai.appsdk.models.agent import RuntimeAgent from hikigai.appsdk.models.response import InvokeResponse, StreamChunk, InvocationMetadata from hikigai.appsdk.sona import SONAClient logger = logging.getLogger(__name__) class AppClient: """ Client for invoking AI agents. Example: client = AppClient( api_key=os.environ["HIKIGAI_API_KEY"], project_id=os.environ["HIKIGAI_PROJECT_ID"] ) # Get an agent agent = client.agent("medical-coder") # Invoke response = agent.invoke("Patient has fever...") print(response.content) # Stream for chunk in agent.stream("Tell me about diabetes"): print(chunk, end="") """ def __init__( self, api_key: Optional[str] = None, project_id: Optional[str] = None, base_url: Optional[str] = None, timeout: float = 30.0, sona_url: Optional[str] = None, sona_api_key: Optional[str] = None, ): """ Initialize AppClient. Args: api_key: Hikigai API key (defaults to HIKIGAI_API_KEY env var) project_id: Project ID (defaults to HIKIGAI_PROJECT_ID env var) base_url: API base URL (defaults to production) timeout: Default request timeout in seconds sona_url: SONA service URL (defaults to SONA_SERVICE_URL env var or http://localhost:8002) sona_api_key: SONA API key (defaults to SONA_API_KEY env var) Raises: ConfigurationError: If required credentials are missing """ # Get credentials self.api_key = api_key or os.environ.get("HIKIGAI_API_KEY") self.project_id = project_id or os.environ.get("HIKIGAI_PROJECT_ID") self._sona_url = sona_url self._sona_api_key = sona_api_key if not self.api_key: raise ConfigurationError( "API key is required. Provide it via api_key parameter or HIKIGAI_API_KEY env var." ) if not self.project_id: raise ConfigurationError( "Project ID is required. Provide it via project_id parameter or HIKIGAI_PROJECT_ID env var." ) # Initialize HTTP client self.api = APIClient( api_key=self.api_key, project_id=self.project_id, base_url=base_url, timeout=timeout, ) logger.info(f"AppClient initialized for project: {self.project_id}") def agent(self, agent_id: str) -> RuntimeAgent: """ Get an agent for invocation. Args: agent_id: Agent ID or slug Returns: RuntimeAgent: Agent ready for invocation Raises: AgentNotFoundError: If agent doesn't exist Example: agent = client.agent("medical-coder") response = agent.invoke("Patient has fever...") """ logger.debug(f"Getting agent: {agent_id}") try: response = self.api.get(f"/api/v1/agents/{agent_id}") agent = RuntimeAgent( id=response.get("id", "unknown-id"), name=response.get("name", "Unknown Agent"), slug=response.get("slug", "unknown-agent"), version=response.get("version", "1.0.0"), description=response.get("description"), status=response.get("deployment_status", "unknown"), endpoint=response.get("endpoint_url"), ) return agent.set_client(self) except Exception as e: logger.error(f"Failed to get agent: {e}") raise def list_agents( self, category: Optional[str] = None, tags: Optional[List[str]] = None, search: Optional[str] = None, ) -> List[RuntimeAgent]: """ List available agents. Args: category: Filter by category tags: Filter by tags search: Search query Returns: List of available agents """ logger.debug("Listing agents") params = {} if category: params["category"] = category if tags: params["tags"] = tags if search: params["search"] = search try: response = self.api.get("/api/v1/agents", params=params) agents = [] for agent_data in response.get("agents", []): agent = RuntimeAgent( id=agent_data["id"], name=agent_data["name"], slug=agent_data["slug"], version=agent_data.get("version", "1.0.0"), description=agent_data.get("description"), status=agent_data.get("deployment_status", "unknown"), endpoint=agent_data.get("endpoint_url"), ) agents.append(agent.set_client(self)) return agents except Exception as e: logger.error(f"Failed to list agents: {e}") raise @property def sona(self) -> SONAClient: """Access SONA personalization features (edit tracking, preferences, suggestions). The client talks directly to the SONA service. Configure via ``sona_url`` constructor arg or ``SONA_SERVICE_URL`` env var. """ if not hasattr(self, "_sona"): self._sona = SONAClient( sona_url=getattr(self, "_sona_url", None), sona_api_key=getattr(self, "_sona_api_key", None), ) return self._sona def _invoke_agent( self, agent_id: str, input: Union[str, Dict[str, Any]], session_id: Optional[str] = None, provider: Optional[str] = None, model: Optional[str] = None, timeout: Optional[float] = None, connectors: Optional[Dict[str, Dict[str, str]]] = None, plugin_context: Optional[Dict[str, Dict[str, Any]]] = None, ) -> InvokeResponse: """ Internal method to invoke an agent. Called by RuntimeAgent.invoke(). """ # Prepare payload payload: Dict[str, Any] = {"input": input} if session_id: payload["session_id"] = session_id if provider: payload["provider"] = provider if model: payload["model"] = model if connectors: payload["connectors"] = connectors if plugin_context: payload["plugin_context"] = plugin_context logger.debug(f"Invoking agent {agent_id}") try: response = self.api.post( f"/api/v1/agents/{agent_id}/invoke", json=payload, timeout=timeout, ) # Parse response # Check for flat structure (current backend) vs nested structure (legacy/other) if "content" in response: content = response["content"] metadata_data = response # Use the whole response as metadata source # If content is a dict/json string, we might want to parse it, # but for now we trust the backend returns a string as per schema else: # Legacy/Nested structure output_data = response.get("output", {}) metadata_data = response.get("metadata", {}) # Extract content if isinstance(output_data, dict): content = output_data.get("content", output_data.get("response", str(output_data))) else: content = str(output_data) # Build metadata metadata = InvocationMetadata( invocation_id=metadata_data.get("request_id", metadata_data.get("invocation_id", str(uuid.uuid4()))), latency_ms=metadata_data.get("latency_ms"), timestamp=datetime.fromisoformat(metadata_data["timestamp"]) if "timestamp" in metadata_data else datetime.utcnow(), status=metadata_data.get("status", "success"), tokens_used=metadata_data.get("tokens_used"), tools_called=metadata_data.get("tools_called", []), phi_redacted=metadata_data.get("phi_redacted", True), trace_id=metadata_data.get("trace_id"), ) return InvokeResponse( content=content, agent_id=response.get("agent_id", agent_id), agent_version=response.get("version"), session_id=session_id, status=response.get("status", "success"), output=response.get("output"), confidence=response.get("confidence"), safety_flags=response.get("safety_flags", []), citations=response.get("citations", []), metadata=metadata, message=response.get("message"), plugins=response.get("plugins"), downstream_context=response.get("downstream_context"), ) except Exception as e: logger.error(f"Invocation failed: {e}") raise InvocationError( f"Failed to invoke agent: {e}", agent_id=agent_id, session_id=session_id ) def _stream_agent( self, agent_id: str, input: Union[str, Dict[str, Any]], session_id: Optional[str] = None, provider: Optional[str] = None, model: Optional[str] = None, connectors: Optional[Dict[str, Dict[str, str]]] = None, ) -> Iterator[str]: """ Internal method to stream agent responses. Called by RuntimeAgent.stream(). """ # Prepare payload payload: Dict[str, Any] = { "input": input, "stream": True, } if session_id: payload["session_id"] = session_id if provider: payload["provider"] = provider if model: payload["model"] = model if connectors: payload["connectors"] = connectors logger.debug(f"Streaming from agent {agent_id}") try: # Use streaming endpoint with self.api.stream( "POST", f"/api/v1/agents/{agent_id}/invoke", json=payload, ) as response: for line in response.iter_lines(): if line: # Parse SSE format: "data: {content}" try: line_str = line.decode('utf-8') if isinstance(line, bytes) else line except UnicodeDecodeError: logger.warning(f"Failed to decode SSE line as UTF-8: {line!r}") continue if line_str.startswith("data: "): chunk_data = line_str[6:].strip() if chunk_data and chunk_data != "[DONE]": yield chunk_data except Exception as e: logger.error(f"Streaming failed: {e}") raise InvocationError( f"Failed to stream from agent: {e}", agent_id=agent_id, session_id=session_id ) def close(self): """Close HTTP client and cleanup resources.""" if hasattr(self, "api"): self.api.close() def __enter__(self): """Context manager support.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Cleanup when exiting context.""" self.close() def __del__(self): """Ensure cleanup on deletion.""" try: self.close() except Exception: pass ================================================================================ File: hikigai/appsdk/models/agent.py ================================================================================ """ Runtime agent model for invocation. """ from typing import Optional, Dict, Any, Iterator, TYPE_CHECKING from pydantic import BaseModel, Field import uuid from datetime import datetime if TYPE_CHECKING: from hikigai.appsdk.client import AppClient from hikigai.appsdk.models.response import InvokeResponse, StreamChunk, InvocationMetadata class RuntimeAgent(BaseModel): """ Runtime representation of an agent for invocation. This model focuses on runtime capabilities (invoke, stream, sessions). """ model_config = {"arbitrary_types_allowed": True} id: str = Field(..., description="Agent ID") name: str = Field(..., description="Agent name/slug") slug: str = Field(..., description="URL-friendly slug") version: Optional[str] = Field(None, description="Agent version") description: Optional[str] = Field(None, description="Short description") status: str = Field(..., description="Deployment status") endpoint: Optional[str] = Field(None, description="Invocation endpoint") # Internal reference to client (not serialized) _client: Optional["AppClient"] = None _session_id: Optional[str] = None def set_client(self, client: "AppClient") -> "RuntimeAgent": """Set the client reference (internal use).""" self._client = client return self def invoke( self, input: str | Dict[str, Any], session_id: Optional[str] = None, provider: Optional[str] = None, model: Optional[str] = None, timeout: Optional[float] = None, connectors: Optional[Dict[str, Dict[str, str]]] = None, plugin_context: Optional[Dict[str, Dict[str, Any]]] = None, ) -> InvokeResponse: """ Invoke the agent with input. Args: input: Input message (string or structured dict) session_id: Optional session ID for conversation context provider: Optional provider override ('gemini', 'openai', 'anthropic') model: Optional model override timeout: Optional timeout override connectors: Optional MCP connector credentials. Format: {"connector-slug": {"ENV_VAR": "value", ...}} Example: {"epic-ehr": {"EPIC_CLIENT_ID": "xxx", "EPIC_PRIVATE_KEY": "yyy"}} plugin_context: Optional plugin data passed to active plugins. Format: {"sona": {"user_id": "dr-smith-uuid", "context_type": "follow_up"}} Returns: InvokeResponse: Agent response with metadata. When SONA is active, ``response.plugins["sona"]`` contains the ``output_id``, ``pending_suggestions``, and ``status``. Example: response = agent.invoke("What is diabetes?") print(response.content) # With SONA personalization: response = agent.invoke( "Transcript: patient presents with...", plugin_context={"sona": {"user_id": "dr-smith-uuid"}} ) output_id = response.plugins["sona"]["output_id"] """ if not self._client: raise ValueError("Agent not properly initialized (missing client connection)") effective_session = session_id or self._session_id return self._client._invoke_agent( agent_id=self.id, input=input, session_id=effective_session, provider=provider, model=model, timeout=timeout, connectors=connectors, plugin_context=plugin_context, ) def stream( self, input: str | Dict[str, Any], session_id: Optional[str] = None, provider: Optional[str] = None, model: Optional[str] = None, connectors: Optional[Dict[str, Dict[str, str]]] = None, ) -> Iterator[str]: """ Stream agent response in real-time. Args: input: Input message (string or structured dict) session_id: Optional session ID provider: Optional provider override model: Optional model override connectors: Optional MCP connector credentials Yields: str: Response chunks Example: for chunk in agent.stream("Tell me a story"): print(chunk, end="", flush=True) """ if not self._client: raise ValueError("Agent not properly initialized (missing client connection)") effective_session = session_id or self._session_id yield from self._client._stream_agent( agent_id=self.id, input=input, session_id=effective_session, provider=provider, model=model, connectors=connectors, ) def with_session(self, session_id: str) -> "RuntimeAgent": """ Create a session-bound copy of this agent. All invocations will use the same session ID for context. Args: session_id: Session identifier Returns: RuntimeAgent: New agent instance with session bound Example: session_agent = agent.with_session("user-123") session_agent.invoke("What is diabetes?") session_agent.invoke("How is it treated?") # Remembers previous context """ # Create a copy with session ID agent_copy = self.model_copy() agent_copy._session_id = session_id agent_copy._client = self._client return agent_copy ================================================================================ File: hikigai/appsdk/models/response.py ================================================================================ """ Invocation response models. """ from datetime import datetime from typing import Optional, Dict, Any, List from pydantic import BaseModel, Field, model_validator class InvocationMetadata(BaseModel): """Metadata about an agent invocation.""" invocation_id: str = Field(..., description="Unique invocation ID") latency_ms: Optional[int] = Field(None, description="Latency in milliseconds") timestamp: datetime = Field(..., description="Invocation timestamp") status: str = Field(..., description="'success' or 'error'") tokens_used: Optional[int] = Field(None, description="Total tokens consumed") tools_called: List[str] = Field(default_factory=list, description="Tools invoked") phi_redacted: bool = Field(True, description="Whether PHI was redacted") trace_id: Optional[str] = Field(None, description="Trace ID for troubleshooting") class Config: json_encoders = { datetime: lambda v: v.isoformat() } class ClinicalConfidence(BaseModel): """Healthcare confidence scoring.""" score: float = Field(..., ge=0, le=1) quality_metrics: Optional[Dict[str, Any]] = None reasoning: Optional[str] = None @model_validator(mode="before") @classmethod def _normalize_score_field(cls, values: Any) -> Any: """Accept 'overall_score' as an alias for 'score' (agents may use either key).""" if isinstance(values, dict) and "score" not in values and "overall_score" in values: values = dict(values) # avoid mutating the original values["score"] = values.pop("overall_score") return values @property def overall_score(self) -> float: """Alias for score for backward compatibility with some scripts.""" return self.score class SafetyFlag(BaseModel): """Healthcare safety flag.""" category: str severity: str = Field(..., pattern="^(low|medium|high|critical)$") message: str mitigation: Optional[str] = None class ClinicalCitation(BaseModel): """Evidence-based clinical citations.""" id: str text: str source: str url: Optional[str] = None class InvokeResponse(BaseModel): """ Response from an agent invocation. Contains the agent's output and metadata about the invocation. When SONA is active, ``plugins["sona"]`` contains the note_id, preferences applied, and any pending style suggestions. """ content: str = Field(..., description="Agent response content") agent_id: str = Field(..., description="Agent ID") agent_version: Optional[str] = Field(None, description="Agent version") session_id: Optional[str] = Field(None, description="Session ID") status: str = Field("success", description="Status (success, requires_human_review, etc.)") output: Optional[Dict[str, Any]] = Field(None, description="Structured agent output") confidence: Optional[ClinicalConfidence] = Field(None, description="Healthcare clinical confidence") safety_flags: List[SafetyFlag] = Field(default_factory=list, description="Healthcare clinical safety flags") citations: List[ClinicalCitation] = Field(default_factory=list, description="Healthcare clinical citations") metadata: InvocationMetadata = Field(..., description="Invocation metadata") message: Optional[str] = Field(None, description="Optional warning or info message from the platform") plugins: Optional[Dict[str, Any]] = Field(None, description="Plugin response metadata (e.g. SONA note_id, suggestions)") downstream_context: Optional[Dict[str, Any]] = None # Convenience properties @property def raw_output(self) -> str: """Alias for content.""" return self.content @property def latency_ms(self) -> Optional[int]: """Convenience accessor for latency.""" return self.metadata.latency_ms @property def tokens_used(self) -> Optional[int]: """Convenience accessor for tokens.""" return self.metadata.tokens_used class StreamChunk(BaseModel): """A chunk from a streaming response.""" content: str = Field(..., description="Chunk content") is_final: bool = Field(False, description="Whether this is the final chunk") metadata: Optional[Dict[str, Any]] = Field(None, description="Chunk metadata") ================================================================================ File: hikigai/appsdk/sona.py ================================================================================ """ SONAClient — Convenience wrapper for SONA personalization features. Accessible via ``app_client.sona``. Provides methods for edit tracking, preference management, suggestion handling, and analytics. Calls go directly to the SONA service (default http://localhost:8002), NOT through the Hikigai backend. """ import logging import os from typing import Any, Dict, List, Optional import httpx logger = logging.getLogger(__name__) class SONAClient: """Client for SONA edit tracking, preferences, and suggestions. Connects directly to the SONA service via HTTP. """ def __init__( self, sona_url: Optional[str] = None, sona_api_key: Optional[str] = None, timeout: float = 15.0, ): self._base_url = ( sona_url or os.environ.get("SONA_SERVICE_URL") or "http://localhost:8002" ).rstrip("/") self._api_key = ( sona_api_key or os.environ.get("SONA_API_KEY") or "sona-change-this-in-production" ) self._client = httpx.Client( base_url=self._base_url, headers={"X-SONA-API-Key": self._api_key}, timeout=timeout, ) def _get(self, path: str, params: Optional[Dict] = None) -> Dict[str, Any]: resp = self._client.get(path, params=params) resp.raise_for_status() return resp.json() def _post(self, path: str, json: Optional[Dict] = None) -> Dict[str, Any]: resp = self._client.post(path, json=json or {}) resp.raise_for_status() return resp.json() # ── Edit tracking ─────────────────────────────────────────────── def submit_edit( self, output_id: str, final_text: str, edit_duration_seconds: Optional[int] = None, user_id: Optional[str] = None, ) -> Dict[str, Any]: """ Submit a user's edited version of an agent output. Args: output_id: The ``output_id`` returned in ``response.plugins["sona"]``. final_text: The text after the user edited it. edit_duration_seconds: Optional time the user spent editing. user_id: End-user identifier. Required if not inferrable. Returns: Dict with ``output_id``, ``status``, ``edit_count``, ``was_edited``, and ``diff_summary``. """ payload: Dict[str, Any] = {"final_text": final_text} if user_id: payload["user_id"] = user_id if edit_duration_seconds is not None: payload["edit_duration_seconds"] = edit_duration_seconds return self._post(f"/api/v1/edits/{output_id}/submit", json=payload) def approve_output(self, output_id: str, user_id: Optional[str] = None) -> Dict[str, Any]: """ Mark an output as approved without edits. Args: output_id: The ``output_id`` from the invoke response. user_id: End-user identifier. """ payload: Dict[str, Any] = {} if user_id: payload["user_id"] = user_id return self._post(f"/api/v1/edits/{output_id}/approve", json=payload) # ── Preferences ───────────────────────────────────────────────── def get_preferences(self, user_id: str, agent_id: Optional[str] = None) -> Dict[str, Any]: """Get a user's personalization preferences.""" params: Dict[str, str] = {} if agent_id: params["agent_id"] = agent_id return self._get(f"/api/v1/preferences/{user_id}", params=params) def update_preferences(self, user_id: str, agent_id: str, **kwargs) -> Dict[str, Any]: """ Update preferences. Keyword args can include ``output_length``, ``abbreviations``, ``custom_phrases``, ``section_order``, etc. """ payload = {"user_id": user_id, "agent_id": agent_id, **kwargs} return self._post("/api/v1/preferences", json=payload) # ── Suggestions ───────────────────────────────────────────────── def get_suggestions(self, user_id: str, agent_id: Optional[str] = None) -> Dict[str, Any]: """Get pending pattern suggestions for a user.""" params: Dict[str, str] = {} if agent_id: params["agent_id"] = agent_id return self._get(f"/api/v1/suggestions/{user_id}", params=params) def respond_to_suggestion(self, pattern_id: str, response: str, user_id: Optional[str] = None) -> Dict[str, Any]: """ Accept or reject a pattern suggestion. Args: pattern_id: The ``pattern_id`` from suggestions. response: ``"accepted"`` or ``"rejected"``. user_id: End-user identifier. """ payload: Dict[str, Any] = {"response": response} if user_id: payload["user_id"] = user_id return self._post(f"/api/v1/suggestions/{pattern_id}/respond", json=payload) # ── Analytics ─────────────────────────────────────────────────── def get_analytics(self, user_id: str, agent_id: Optional[str] = None) -> Dict[str, Any]: """Get edit analytics for a user.""" params: Dict[str, str] = {} if agent_id: params["agent_id"] = agent_id return self._get(f"/api/v1/analytics/{user_id}", params=params)