================================================================================ File: hikigai/agentsdk/__init__.py ================================================================================ """ hikigai-agentsdk: Python SDK for deploying AI agents. Deploy and manage AI agents on the Hikigai platform. """ __version__ = "0.0.1" from hikigai.agentsdk.client import AgentClient from hikigai.agentsdk.models import ( AgentConfig, ToolConfig, ConnectorConfig, InputSchema, OutputSchema, StringField, IntegerField, BooleanField, ArrayField, ObjectField, DeployedAgent, DeploymentResult, RuntimeConfig, HIPAAConfig, SubAgentConfig, PlannerConfig, GenerationConfig, ) from hikigai.agentsdk.tools import tool, FunctionTool, OpenAPITool, MCPTool # Re-export common exceptions from core from hikigai.core.exceptions import ( HikigaiError, AuthenticationError, RateLimitError, ValidationError, DeploymentError, ConfigurationError, ) __all__ = [ "__version__", # Client "AgentClient", # Configuration "AgentConfig", "ToolConfig", "ConnectorConfig", "RuntimeConfig", "HIPAAConfig", "SubAgentConfig", "PlannerConfig", "GenerationConfig", # Schemas "InputSchema", "OutputSchema", "StringField", "IntegerField", "BooleanField", "ArrayField", "ObjectField", # Models "DeployedAgent", "DeploymentResult", # Tools "tool", "FunctionTool", "OpenAPITool", "MCPTool", # Exceptions "HikigaiError", "AuthenticationError", "RateLimitError", "ValidationError", "DeploymentError", "ConfigurationError", ] ================================================================================ File: hikigai/agentsdk/client.py ================================================================================ """ AgentClient: Deploy and manage AI agents on the Hikigai platform. This is the main client for agent developers to deploy, update, and manage agents. """ import os import time import logging from typing import List, Optional, Dict, Any from pathlib import Path from hikigai.core.api.client import APIClient from hikigai.core.exceptions import ( ConfigurationError, DeploymentError, AgentNotFoundError, ) from hikigai.agentsdk.models.config import AgentConfig from hikigai.agentsdk.models.agent import DeployedAgent, DeploymentResult from hikigai.agentsdk.models.schemas import InputSchema, OutputSchema from hikigai.agentsdk.tools import normalize_tools logger = logging.getLogger(__name__) class AgentClient: """ Client for deploying and managing AI agents. Example: client = AgentClient( api_key=os.environ["HIKIGAI_API_KEY"], project_id=os.environ["HIKIGAI_PROJECT_ID"] ) # Deploy an agent agent = client.deploy(AgentConfig(...)) # List agents agents = client.list_agents() # Delete an agent client.delete_agent(agent.id) """ def __init__( self, api_key: Optional[str] = None, project_id: Optional[str] = None, base_url: Optional[str] = None, timeout: float = 30.0, ): """ Initialize AgentClient. Args: api_key: Hikigai API key (defaults to HIKIGAI_API_KEY env var) project_id: Project ID (defaults to HIKIGAI_PROJECT_ID env var) base_url: API base URL (defaults to production) timeout: Default request timeout in seconds Raises: ConfigurationError: If required credentials are missing """ # Get credentials self.api_key = api_key or os.environ.get("HIKIGAI_API_KEY") self.project_id = project_id or os.environ.get("HIKIGAI_PROJECT_ID") if not self.api_key: raise ConfigurationError( "API key is required. Provide it via api_key parameter or HIKIGAI_API_KEY env var." ) if not self.project_id: raise ConfigurationError( "Project ID is required. Provide it via project_id parameter or HIKIGAI_PROJECT_ID env var." ) # Initialize HTTP client self.api = APIClient( api_key=self.api_key, project_id=self.project_id, base_url=base_url, timeout=timeout, ) logger.info(f"AgentClient initialized for project: {self.project_id}") def deploy( self, config: AgentConfig, timeout: Optional[float] = 600.0, ) -> DeployedAgent: """ Deploy an agent to the platform. Args: config: Agent configuration timeout: Deployment timeout in seconds (default: 10 minutes) Returns: DeployedAgent: Deployed agent metadata Raises: DeploymentError: If deployment fails ValidationError: If configuration is invalid """ start_time = time.time() logger.info(f"Deploying agent: {config.name}") # Prepare payload payload = self._prepare_deploy_payload(config) try: # Call deployment API response = self.api.post( "/api/v1/agents/deploy/full", json=payload, timeout=timeout, ) # Parse response agent = DeployedAgent( id=response["agent_id"], name=response["name"], slug=response["slug"], version=response["version"], deployment_status=response["status"], deployment_type=response["deployment_type"], endpoint_url=response.get("endpoint"), resource_name=response.get("resource_name"), hipaa_compliant=response["hipaa_compliant"], hipaa_verified=response["hipaa_verified"], ) elapsed = time.time() - start_time logger.info(f"Agent deployed successfully in {elapsed:.1f}s: {agent.slug}") return agent except Exception as e: logger.error(f"Deployment failed: {e}") raise DeploymentError(f"Failed to deploy agent: {e}", agent_name=config.name) def deploy_from_file( self, file_path: str, name: Optional[str] = None, description: Optional[str] = None, ) -> DeployedAgent: """ Deploy an agent from a Python file. Args: file_path: Path to agent.py file name: Optional agent name override description: Optional description Returns: DeployedAgent: Deployed agent metadata Raises: DeploymentError: If deployment fails FileNotFoundError: If file doesn't exist """ path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Agent file not found: {file_path}") # Read agent code agent_code = path.read_text() logger.info(f"Deploying agent from file: {file_path}") # Call file deployment API payload = { "agent_code": agent_code, "name": name, "description": description, } try: response = self.api.post( "/api/v1/agents/deploy/adk", json=payload, timeout=600.0, ) return DeployedAgent( id=response["agent_id"], name=response["name"], slug=response["slug"], version=response["version"], deployment_status=response["status"], deployment_type=response["deployment_type"], endpoint_url=response.get("endpoint"), resource_name=response.get("resource_name"), hipaa_compliant=response.get("hipaa_compliant", True), hipaa_verified=response.get("hipaa_verified", False), ) except Exception as e: logger.error(f"File deployment failed: {e}") raise DeploymentError(f"Failed to deploy from file: {e}") def list_agents(self) -> List[DeployedAgent]: """ List all agents in the project. Returns: List of deployed agents """ logger.debug("Listing agents") try: response = self.api.get("/api/v1/agents") agents = [] for agent_data in response.get("agents", []): agents.append(DeployedAgent( id=agent_data["id"], name=agent_data["name"], slug=agent_data["slug"], version=agent_data.get("version", "1.0.0"), display_name=agent_data.get("display_name"), description=agent_data.get("description"), deployment_status=agent_data["deployment_status"], deployment_type=agent_data.get("deployment_type", "unknown"), endpoint_url=agent_data.get("endpoint_url"), cloud_provider=agent_data.get("cloud_provider"), region=agent_data.get("region"), )) return agents except Exception as e: logger.error(f"Failed to list agents: {e}") raise def get_agent(self, agent_id: str) -> DeployedAgent: """ Get agent by ID or slug. Args: agent_id: Agent ID or slug Returns: DeployedAgent: Agent metadata Raises: AgentNotFoundError: If agent doesn't exist """ logger.debug(f"Getting agent: {agent_id}") try: response = self.api.get(f"/api/v1/agents/{agent_id}") return DeployedAgent( id=response["id"], name=response["name"], slug=response["slug"], version=response.get("version", "1.0.0"), display_name=response.get("display_name"), description=response.get("description"), deployment_status=response["deployment_status"], deployment_type=response.get("deployment_type", "unknown"), endpoint_url=response.get("endpoint_url"), cloud_provider=response.get("cloud_provider"), region=response.get("region"), ) except Exception as e: logger.error(f"Failed to get agent: {e}") raise def delete_agent(self, agent_id: str) -> None: """ Delete an agent. Args: agent_id: Agent ID or slug Raises: AgentNotFoundError: If agent doesn't exist """ logger.info(f"Deleting agent: {agent_id}") try: self.api.delete(f"/api/v1/agents/{agent_id}") logger.info(f"Agent deleted: {agent_id}") except Exception as e: logger.error(f"Failed to delete agent: {e}") raise def update_agent(self, agent_id: str, update: Dict[str, Any]) -> DeployedAgent: """ Update an existing agent's metadata/runtime config and trigger a redeploy. This mirrors the frontend behavior which performs a `PUT /api/v1/agents/{id}` with the updated fields (including `sub_agents`) and relies on a developer JWT for authentication. If `HIKIGAI_JWT` is present in the environment we will inject it into the underlying HTTP client's headers so the call matches the frontend authorization flow. """ logger.info(f"Updating agent {agent_id} with fields: {list(update.keys())}") # Prefer calling the deploy endpoint to trigger a revision (deploy endpoints # accept API-key auth and follow the same flow as initial deploy). try: current = self.api.get(f"/api/v1/agents/{agent_id}") payload = { "name": current.get("slug") or current.get("name"), "display_name": current.get("display_name") or current.get("name"), "description": update.get("description", current.get("description", "")), "instruction": update.get("instruction", current.get("instruction", "")), "model": update.get("model", current.get("model")), "category": update.get("category", current.get("category", "documentation")), "tags": update.get("tags", current.get("tags", [])), "agent_type": update.get("agent_type", current.get("agent_type", "llm")), "version": update.get("version", current.get("version", "1.0.0")), "project_id": self.project_id, "input_schema": update.get("input_schema", current.get("input_schema", {})), "output_schema": update.get("output_schema", current.get("output_schema", {})), "tools": update.get("tools", current.get("tools", [])), "timeout": update.get("timeout", current.get("timeout") or 60), "memory_mb": update.get("memory_mb", current.get("memory_mb", 512)), "sub_agents": update.get("sub_agents", (current.get("sub_agents") or [])), } if "runtime_config" in update: rc = update.get("runtime_config") or {} payload.update(rc) response = self.api.post("/api/v1/agents/deploy/full", json=payload, timeout=600.0) return DeployedAgent( id=response.get('agent_id'), name=response.get('name'), slug=response.get('slug'), version=response.get('version'), deployment_status=response.get('status'), deployment_type=response.get('deployment_type'), endpoint_url=response.get('endpoint'), resource_name=response.get('resource_name'), hipaa_compliant=response.get('hipaa_compliant', True), hipaa_verified=response.get('hipaa_verified', False), ) except Exception as e: logger.error(f"Failed to redeploy via deploy/full: {e}") # Fallback to PUT using developer JWT injection if available jwt = os.environ.get("HIKIGAI_JWT") if jwt: try: if hasattr(self.api, '_headers'): self.api._headers['Authorization'] = f"Bearer {jwt}" if hasattr(self.api, '_client') and hasattr(self.api._client, 'headers'): self.api._client.headers['Authorization'] = f"Bearer {jwt}" except Exception: logger.debug("Failed to inject HIKIGAI_JWT into API client headers") response = self.api.put(f"/api/v1/agents/{agent_id}", json=update) return DeployedAgent( id=response.get('id') or response.get('agent_id'), name=response.get('name'), slug=response.get('slug'), version=response.get('version', '1.0.0'), deployment_status=response.get('deployment_status', response.get('status')), deployment_type=response.get('deployment_type', 'config_based'), endpoint_url=response.get('endpoint') or response.get('endpoint_url'), ) def _prepare_deploy_payload(self, config: AgentConfig) -> Dict[str, Any]: """Prepare deployment payload from AgentConfig.""" # Convert schemas to JSON Schema format input_schema_dict = {} output_schema_dict = {} if isinstance(config.input_schema, InputSchema): input_schema_dict = config.input_schema.to_json_schema() elif isinstance(config.input_schema, dict): input_schema_dict = config.input_schema if isinstance(config.output_schema, OutputSchema): output_schema_dict = config.output_schema.to_json_schema() elif isinstance(config.output_schema, dict): output_schema_dict = config.output_schema # Normalize tools tools_list = normalize_tools(config.tools) if config.tools else [] # Serialize sub-agents recursively sub_agents_list = [] for sub_agent in config.sub_agents: sub_agents_list.append(self._serialize_sub_agent(sub_agent)) # Serialize planner config planner_dict = None if config.planner_config: planner_dict = config.planner_config.model_dump(exclude_none=True) # Serialize generation config generation_dict = None if config.generation_config: generation_dict = config.generation_config.model_dump(exclude_none=True) # Build payload matching backend API contract payload = { # Identity "name": config.name, "display_name": config.display_name, "description": config.description, "long_description": config.long_description, # Core config "agent_type": config.agent_type, "instruction": config.instruction, "model": config.model, # Multi-agent support "sub_agents": sub_agents_list, # Advanced features "planner_config": planner_dict, "code_execution": config.code_execution, "generation_config": generation_dict, # State management "output_key": config.output_key, "include_contents": config.include_contents, "max_iterations": config.max_iterations, # Classification "category": config.category, "tags": config.tags, # Schemas & Tools "input_schema": input_schema_dict, "output_schema": output_schema_dict, "tools": tools_list, # MCP Connectors "mcp_connectors": [ { "slug": c.slug, "tool_filter": c.tool_filter, "required": c.required, # URL is resolved from the project's connector registry at deploy time } for c in config.connectors ] if config.connectors else [], # Runtime "timeout": config.timeout, "memory_mb": config.memory_mb, "min_instances": config.min_instances, "max_instances": config.max_instances, # Versioning "version": config.version, "changelog": config.changelog, # Compliance & Visibility "hipaa_compliant": config.hipaa_compliant, "public": config.public, # Cloud deployment "cloud_provider": config.cloud_provider, "gcp_region": config.gcp_region, "aws_region": config.aws_region, # Healthcare Spec Extended (§3) "risk_tier": config.risk_tier, "clinical_domain": config.clinical_domain, "a2a_skills": config.a2a_skills, "resource_limits": config.resource_limits, "fhir_resources": config.fhir_resources, "prompt_version": config.prompt_version, } return payload def _serialize_sub_agent(self, sub_agent: "SubAgentConfig") -> Dict[str, Any]: """Recursively serialize a sub-agent configuration.""" # Serialize basic fields sub_agent_dict: Dict[str, Any] = { "name": sub_agent.name, "agent_type": sub_agent.agent_type, "model": sub_agent.model, "instruction": sub_agent.instruction, "description": sub_agent.description, "tools": normalize_tools(sub_agent.tools) if getattr(sub_agent, 'tools', None) else [], "max_iterations": sub_agent.max_iterations, "output_key": getattr(sub_agent, "output_key", None), "include_contents": getattr(sub_agent, "include_contents", None), } # Serialize input/output schemas if provided if getattr(sub_agent, "input_schema", None): if isinstance(sub_agent.input_schema, InputSchema): sub_agent_dict["input_schema"] = sub_agent.input_schema.to_json_schema() elif isinstance(sub_agent.input_schema, dict): sub_agent_dict["input_schema"] = sub_agent.input_schema if getattr(sub_agent, "output_schema", None): if isinstance(sub_agent.output_schema, OutputSchema): sub_agent_dict["output_schema"] = sub_agent.output_schema.to_json_schema() elif isinstance(sub_agent.output_schema, dict): sub_agent_dict["output_schema"] = sub_agent.output_schema # Recursively serialize nested sub-agents if sub_agent.sub_agents: sub_agent_dict["sub_agents"] = [ self._serialize_sub_agent(sa) for sa in sub_agent.sub_agents ] else: sub_agent_dict["sub_agents"] = [] return sub_agent_dict def close(self): """Close HTTP client and cleanup resources.""" if hasattr(self, "api"): self.api.close() def __enter__(self): """Context manager support.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Cleanup when exiting context.""" self.close() def __del__(self): """Ensure cleanup on deletion.""" try: self.close() except Exception: pass ================================================================================ File: hikigai/agentsdk/models/agent.py ================================================================================ """ Deployed agent model and deployment result. """ from datetime import datetime from typing import Optional, Dict, Any, List from pydantic import BaseModel, Field class DeployedAgent(BaseModel): """ Represents a successfully deployed agent. Contains deployment metadata and status information. """ id: str = Field(..., description="Unique agent ID") name: str = Field(..., description="Agent name/slug") slug: str = Field(..., description="URL-friendly slug") version: str = Field(..., description="Semantic version") display_name: Optional[str] = Field(None, description="Human-readable name") description: Optional[str] = Field(None, description="Short description") # Deployment info deployment_status: str = Field(..., description="'active', 'pending', 'error'") deployment_type: str = Field(..., description="'config_based', 'adk', 'file'") endpoint_url: Optional[str] = Field(None, description="Agent invocation endpoint") cloud_provider: Optional[str] = Field(None, description="'gcp', 'aws', etc.") region: Optional[str] = Field(None, description="Cloud region") # Resources resource_name: Optional[str] = Field(None, description="Cloud resource ID") # Compliance hipaa_compliant: bool = Field(True, description="HIPAA compliance flag") hipaa_verified: bool = Field(False, description="HIPAA verification status") # Healthcare Spec risk_tier: Optional[str] = Field("low", description="Healthcare risk tier") clinical_domain: Optional[str] = Field(None, description="Clinical domain") a2a_skills: List[Dict[str, Any]] = Field(default_factory=list, description="A2A skills") # Timestamps created_at: Optional[datetime] = Field(None, description="Creation timestamp") deployed_at: Optional[datetime] = Field(None, description="Deployment timestamp") model_config = { "json_encoders": { datetime: lambda v: v.isoformat() if v else None } } class DeploymentResult(BaseModel): """Result of a deployment operation.""" success: bool = Field(..., description="Whether deployment succeeded") agent: Optional[DeployedAgent] = Field(None, description="Deployed agent (if successful)") message: str = Field(..., description="Human-readable result message") error: Optional[str] = Field(None, description="Error details (if failed)") deployment_duration_seconds: Optional[float] = Field(None, description="Time taken to deploy") ================================================================================ File: hikigai/agentsdk/models/config.py ================================================================================ """ Agent configuration model for deployment. """ from typing import Any, Dict, List, Optional, Union, Literal from pydantic import BaseModel, Field, field_validator, model_validator import re from hikigai.agentsdk.models.schemas import InputSchema, OutputSchema from hikigai.agentsdk.models.runtime import RuntimeConfig, HIPAAConfig from hikigai.core.constants import DEFAULT_MODEL, CATEGORIES, VALID_MODELS # ============================================================================ # Multi-Agent Support Models # ============================================================================ class PlannerConfig(BaseModel): """ Configuration for agent planning capabilities. Planning enables agents to think through problems step-by-step before acting. Example: planner = PlannerConfig( type="BuiltInPlanner", include_thoughts=True, thinking_budget=2048 ) """ type: Literal["BuiltInPlanner", "PlanReActPlanner"] = Field( "BuiltInPlanner", description="Planner type to use" ) include_thoughts: bool = Field( True, description="Include model's reasoning in response" ) thinking_budget: int = Field( 1024, ge=128, le=8192, description="Maximum tokens allocated for planning/reasoning" ) @field_validator("thinking_budget") @classmethod def validate_thinking_budget(cls, v: int) -> int: """Ensure thinking budget is within acceptable range.""" if v < 128: raise ValueError("thinking_budget must be at least 128 tokens") if v > 8192: raise ValueError("thinking_budget cannot exceed 8192 tokens") return v class GenerationConfig(BaseModel): """ LLM generation parameters for fine-tuning agent responses. Example: config = GenerationConfig( temperature=0.7, max_output_tokens=4096, top_p=0.95, top_k=40 ) """ temperature: Optional[float] = Field( None, ge=0.0, le=2.0, description="Randomness in responses (0=deterministic, 2=very creative)" ) max_output_tokens: Optional[int] = Field( None, ge=1, le=100000, description="Maximum tokens in model response" ) top_p: Optional[float] = Field( None, ge=0.0, le=1.0, description="Nucleus sampling threshold" ) top_k: Optional[int] = Field( None, ge=1, le=100, description="Top-K sampling parameter" ) @field_validator("temperature") @classmethod def validate_temperature(cls, v: Optional[float]) -> Optional[float]: """Validate temperature is in acceptable range.""" if v is not None and (v < 0.0 or v > 2.0): raise ValueError("temperature must be between 0.0 and 2.0") return v class SubAgentConfig(BaseModel): """ Configuration for a sub-agent in a workflow. Sub-agents are individual agents that compose larger workflows (Sequential, Parallel, Loop). Example: sub_agent = SubAgentConfig( name="entity-extractor", agent_type="llm", model="gemini-2.0-flash", instruction="Extract medical entities from clinical notes", tools=["google_search"] ) """ name: str = Field( ..., min_length=1, max_length=64, description="Sub-agent name (will be sanitized for Python)" ) agent_type: Literal["llm", "sequential", "parallel", "loop"] = Field( "llm", description="Type of sub-agent orchestration" ) model: str = Field( ..., description="Model to use for this sub-agent" ) instruction: str = Field( ..., min_length=10, max_length=50000, description="System prompt for this sub-agent" ) description: Optional[str] = Field( None, max_length=500, description="Brief description of sub-agent's purpose" ) tools: List[str] = Field( default_factory=list, description="Tool names available to this sub-agent" ) # Optional schemas for this sub-agent (leaf or workflow agents can declare their own) input_schema: Optional[Union["InputSchema", Dict[str, Any]]] = Field( default=None, description="Optional input schema for this sub-agent" ) output_schema: Optional[Union["OutputSchema", Dict[str, Any]]] = Field( default=None, description="Optional output schema for this sub-agent" ) # For nested workflow agents sub_agents: List["SubAgentConfig"] = Field( default_factory=list, description="Nested sub-agents (for workflow agents like Sequential/Parallel)" ) # For LoopAgent max_iterations: Optional[int] = Field( None, ge=1, le=10, description="Maximum loop iterations (LoopAgent only)" ) @field_validator("name") @classmethod def validate_name(cls, v: str) -> str: """Ensure name can be converted to valid Python identifier.""" if not v: raise ValueError("Sub-agent name cannot be empty") # Allow letters, numbers, hyphens, underscores if not re.match(r"^[a-zA-Z0-9_-]+$", v): raise ValueError( "Sub-agent name must contain only letters, numbers, hyphens, and underscores" ) return v @field_validator("model") @classmethod def validate_model(cls, v: str) -> str: """Ensure model is valid.""" if v not in VALID_MODELS: raise ValueError( f"Invalid model for sub-agent. Must be one of: {', '.join(VALID_MODELS)}" ) return v @model_validator(mode='after') def validate_workflow_agent(self) -> "SubAgentConfig": """Validate workflow agent configurations.""" # Sequential/Parallel/Loop agents must have sub_agents if self.agent_type in ["sequential", "parallel", "loop"]: if not self.sub_agents: raise ValueError( f"{self.agent_type} agents must have at least one sub-agent" ) # LoopAgent with max_iterations if self.agent_type == "loop" and self.max_iterations is None: raise ValueError("LoopAgent must specify max_iterations") # Non-loop agents shouldn't have max_iterations if self.agent_type != "loop" and self.max_iterations is not None: raise ValueError( f"max_iterations only applies to LoopAgent, not {self.agent_type}" ) return self class ToolConfig(BaseModel): """Configuration for an agent tool.""" name: str = Field(..., description="Tool function name") description: str = Field(..., description="What the tool does") parameters: Optional[Dict[str, Any]] = Field(None, description="Tool parameter schema") builtin: bool = Field(False, description="Is this a built-in tool?") builtin_type: Optional[str] = Field(None, description="Built-in tool type (e.g., 'web_search')") class ConnectorConfig(BaseModel): """ Configuration for an MCP connector that this agent uses. Connectors are external data sources (EHR systems, health data APIs, etc.) accessible via the Model Context Protocol. The agent developer declares which connectors the agent needs; the app developer provides credentials at invocation. Example: connector = ConnectorConfig( slug="epic-ehr", tool_filter=["get_patient", "get_medications"], ) """ slug: str = Field( ..., min_length=1, max_length=100, description="Connector slug from the connector registry (e.g., 'epic-ehr', 'cerner')" ) tool_filter: Optional[List[str]] = Field( None, description="Whitelist of specific tools to use from this connector. None = all tools." ) required: bool = Field( True, description="If True, agent fails if this connector is not available at invocation time" ) @field_validator("slug") @classmethod def validate_slug(cls, v: str) -> str: """Ensure slug is valid format.""" if not re.match(r"^[a-z0-9-]+$", v): raise ValueError("Connector slug must be lowercase with hyphens only") return v class AgentConfig(BaseModel): """ Complete configuration for deploying an AI agent. Example: config = AgentConfig( name="medical-coder", display_name="Medical Coding Assistant", description="Extracts ICD-10 and CPT codes from clinical notes", instruction="You are a medical coding expert...", model="claude-3.5-sonnet", category="Medical Coding", tags=["icd-10", "cpt", "medical"], input_schema=InputSchema(fields={ "clinical_note": StringField(required=True) }), output_schema=OutputSchema(fields={ "icd_codes": ArrayField(), "cpt_codes": ArrayField() }), tools=[], timeout=60, memory_mb=512, version="1.0.0", ) """ # ================== Identity ================== name: str = Field( ..., min_length=3, max_length=64, description="Agent name (slug format: lowercase, hyphens)" ) display_name: str = Field( ..., min_length=3, max_length=100, description="Human-readable display name" ) description: str = Field( ..., min_length=10, max_length=500, description="Short description for marketplace" ) long_description: Optional[str] = Field( None, max_length=5000, description="Full documentation" ) # ================== Core Configuration ================== agent_type: Literal["llm", "sequential", "parallel", "loop"] = Field( "llm", description="Agent orchestration type (llm=single, sequential/parallel/loop=workflow)" ) instruction: str = Field( ..., min_length=10, max_length=50000, description="System prompt for the agent" ) model: str = Field( default=DEFAULT_MODEL, description="AI model to use" ) # ================== Multi-Agent Support ================== sub_agents: List[SubAgentConfig] = Field( default_factory=list, description="Sub-agents for workflow orchestration (Sequential/Parallel/Loop)" ) # ================== Advanced Features ================== planner_config: Optional[PlannerConfig] = Field( None, description="Enable planning/reasoning capabilities" ) code_execution: bool = Field( False, description="Enable code execution capability" ) generation_config: Optional[GenerationConfig] = Field( None, description="LLM generation parameters (temperature, max_tokens, etc.)" ) # ================== State Management ================== output_key: Optional[str] = Field( None, max_length=64, description="Key to save agent output in state for downstream agents" ) include_contents: Optional[Literal["default", "none"]] = Field( None, description="Control conversation history inclusion" ) max_iterations: Optional[int] = Field( None, ge=1, le=10, description="Maximum iterations for LoopAgent" ) # ================== Classification ================== category: str = Field( default="documentation", description="Agent category (Healthcare AI Agent Ecosystem spec)" ) tags: List[str] = Field( default_factory=list, max_length=10, description="Tags for discovery" ) # ================== Schemas ================== input_schema: Union[InputSchema, Dict[str, Any]] = Field( ..., description="Input schema definition (required)" ) output_schema: Union[OutputSchema, Dict[str, Any]] = Field( ..., description="Output schema definition (required)" ) # ================== Tools ================== tools: List[Any] = Field( default_factory=list, description="Agent tools (functions, OpenAPI, MCP)" ) # ================== MCP Connectors ================== connectors: List[ConnectorConfig] = Field( default_factory=list, description="MCP connectors this agent uses (from the connector registry)" ) # ================== Runtime ================== timeout: int = Field( 60, ge=5, le=300, description="Timeout in seconds" ) memory_mb: int = Field( 512, ge=128, le=4096, description="Memory allocation in MB" ) min_instances: int = Field( 0, ge=0, le=10, description="Minimum instances" ) max_instances: int = Field( 10, ge=1, le=100, description="Maximum instances" ) # ================== Versioning ================== version: str = Field( ..., pattern=r"^\d+\.\d+\.\d+$", description="Semantic version (e.g., '1.0.0')" ) changelog: Optional[str] = Field( None, description="What changed in this version" ) # ================== Compliance ================== hipaa_compliant: bool = Field( True, description="Agent handles PHI per HIPAA" ) # ================== Healthcare Spec ================== risk_tier: Optional[Literal["low", "moderate", "high", "critical"]] = Field( "low", description="Healthcare risk tier" ) clinical_domain: Optional[str] = Field( None, description="Clinical domain category" ) a2a_skills: List[Dict[str, Any]] = Field( default_factory=list, description="Machine-readable capabilities for AI-to-AI discovery" ) # Healthcare Spec Manifest resource_limits: Optional[Dict[str, str]] = Field( default={"cpu": "1", "memory": "512Mi"}, description="Container resource limits (cpu, memory)" ) fhir_resources: List[Dict[str, Any]] = Field( default_factory=list, description="Required FHIR resources for agent operation" ) prompt_version: Optional[str] = Field( "1.0.0", description="Specific version of the clinical prompt" ) requires_physician_oversight: bool = Field( False, description="Does this agent require human physician sign-off? " ) fda_clearance_status: str = Field( "not-applicable", description="FDA clearance status (e.g., 'cleared', 'pending', 'not-applicable')" ) # ================== Visibility ================== public: bool = Field( False, description="Publicly listed in marketplace" ) # ================== Cloud Deployment ================== cloud_provider: str = Field( "gcp", description="Cloud provider: 'gcp', 'gcp-agent-engine', 'aws'" ) gcp_region: str = Field( "us-central1", description="GCP region for deployment" ) aws_region: str = Field( "us-east-1", description="AWS region for deployment" ) # ================== Validators ================== @field_validator("name") @classmethod def validate_name(cls, v: str) -> str: """Ensure name is slug-compatible.""" if not re.match(r"^[a-z0-9-]+$", v): raise ValueError("Name must be lowercase with hyphens only") return v @field_validator("model") @classmethod def validate_model(cls, v: str) -> str: """Ensure model is valid.""" if v not in VALID_MODELS: raise ValueError(f"Invalid model. Must be one of: {', '.join(VALID_MODELS)}") return v @field_validator("category") @classmethod def validate_category(cls, v: str) -> str: """Ensure category is valid.""" if v not in CATEGORIES: raise ValueError(f"Invalid category. Must be one of: {', '.join(CATEGORIES)}") return v @field_validator("tags") @classmethod def validate_tags(cls, v: List[str]) -> List[str]: """Limit and validate tags.""" if len(v) > 10: raise ValueError("Maximum 10 tags allowed") return v @model_validator(mode='after') def validate_agent_configuration(self) -> "AgentConfig": """ Comprehensive validation for agent configuration. Ensures workflow agents have proper sub-agents and configurations are consistent. """ # 1. Workflow agents must have sub-agents if self.agent_type in ["sequential", "parallel", "loop"]: if not self.sub_agents: raise ValueError( f"{self.agent_type.capitalize()}Agent requires at least one sub-agent. " f"Add sub-agents using the sub_agents field." ) # Ensure at least 2 sub-agents for loop if self.agent_type == "loop" and len(self.sub_agents) < 2: raise ValueError( "LoopAgent requires at least 2 sub-agents to create a meaningful loop" ) # 2. LLM agents shouldn't have sub-agents (sub-agents ignored for llm type) if self.agent_type == "llm" and self.sub_agents: # Warning: we'll allow this but it will be ignored pass # 3. LoopAgent must have max_iterations if self.agent_type == "loop" and self.max_iterations is None: raise ValueError( "LoopAgent must specify max_iterations (recommended: 3-5)" ) # 4. Non-loop agents shouldn't set max_iterations if self.agent_type != "loop" and self.max_iterations is not None: raise ValueError( f"max_iterations only applies to LoopAgent, not {self.agent_type} agents" ) # 5. Validate planner is only used with supported models if self.planner_config: # Planning requires Gemini 2.0+ or specific models planning_models = [ "gemini-2.0-flash", "gemini-2.5-flash", "gemini-1.5-pro" ] if self.model not in planning_models: raise ValueError( f"Planning requires one of: {','.join(planning_models)}. " f"Current model '{self.model}' does not support planning." ) # 6. Code execution requires specific models if self.code_execution: code_exec_models = [ "gemini-2.0-flash", "gemini-2.5-flash", "gemini-1.5-pro" ] if self.model not in code_exec_models: raise ValueError( f"Code execution requires one of: {', '.join(code_exec_models)}. " f"Current model '{self.model}' does not support code execution." ) # 7. Validate output_key format if self.output_key: if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", self.output_key): raise ValueError( "output_key must be a valid Python identifier (letters, numbers, underscores)" ) return self # Resolve forward references so nested SubAgentConfig definitions are validated try: SubAgentConfig.model_rebuild() except Exception: # model_rebuild may not be available in older pydantic versions; ignore safely pass try: AgentConfig.model_rebuild() except Exception: pass ================================================================================ File: hikigai/agentsdk/models/schemas.py ================================================================================ """ Schema definitions for agent inputs and outputs. Provides structured type definitions for agent communication. """ from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel, Field class FieldDefinition(BaseModel): """Base field definition.""" type: str description: Optional[str] = None required: bool = False default: Optional[Any] = None class StringField(FieldDefinition): """String field definition.""" type: str = "string" min_length: Optional[int] = None max_length: Optional[int] = None pattern: Optional[str] = None enum: Optional[List[str]] = None class IntegerField(FieldDefinition): """Integer field definition.""" type: str = "integer" minimum: Optional[int] = None maximum: Optional[int] = None class BooleanField(FieldDefinition): """Boolean field definition.""" type: str = "boolean" class ArrayField(FieldDefinition): """Array field definition.""" type: str = "array" items: Optional[Union[FieldDefinition, Dict[str, Any]]] = None min_items: Optional[int] = None max_items: Optional[int] = None class ObjectField(FieldDefinition): """Object field definition.""" type: str = "object" properties: Optional[Dict[str, FieldDefinition]] = None required_fields: Optional[List[str]] = None class InputSchema(BaseModel): """ Input schema for agent. Defines the structure of data the agent expects to receive. """ description: Optional[str] = Field(None, description="Schema description") fields: Dict[str, Union[FieldDefinition, Dict[str, Any]]] = Field( default_factory=dict, description="Field definitions" ) required: List[str] = Field(default_factory=list, description="Required field names") def to_json_schema(self) -> Dict[str, Any]: """Convert to JSON Schema format.""" properties = {} required_fields = [] for name, field_def in self.fields.items(): if isinstance(field_def, FieldDefinition): properties[name] = { "type": field_def.type, "description": field_def.description, } if field_def.default is not None: properties[name]["default"] = field_def.default if field_def.required: required_fields.append(name) else: properties[name] = field_def schema = { "type": "object", "properties": properties, } if self.description: schema["description"] = self.description if required_fields or self.required: schema["required"] = list(set(required_fields + self.required)) return schema class OutputSchema(BaseModel): """ Output schema for agent. Defines the structure of data the agent will return. """ description: Optional[str] = Field(None, description="Schema description") fields: Dict[str, Union[FieldDefinition, Dict[str, Any]]] = Field( default_factory=dict, description="Field definitions" ) def to_json_schema(self) -> Dict[str, Any]: """Convert to JSON Schema format.""" properties = {} for name, field_def in self.fields.items(): if isinstance(field_def, FieldDefinition): properties[name] = { "type": field_def.type, "description": field_def.description, } else: properties[name] = field_def schema = { "type": "object", "properties": properties, } if self.description: schema["description"] = self.description return schema ================================================================================ File: hikigai/agentsdk/models/runtime.py ================================================================================ """ Runtime configuration models. """ from typing import Optional from pydantic import BaseModel, Field class RuntimeConfig(BaseModel): """Runtime configuration for agent deployment.""" timeout: int = Field(60, ge=5, le=300, description="Timeout in seconds") memory_mb: int = Field(512, ge=128, le=4096, description="Memory allocation in MB") min_instances: int = Field(0, ge=0, le=10, description="Minimum instances") max_instances: int = Field(10, ge=1, le=100, description="Maximum instances") class HIPAAConfig(BaseModel): """HIPAA compliance configuration.""" compliant: bool = Field(True, description="Requires HIPAA compliance") audit_logging: bool = Field(True, description="Enable audit logging") encryption_at_rest: bool = Field(True, description="Encrypt data at rest") encryption_in_transit: bool = Field(True, description="Encrypt data in transit") ================================================================================ File: hikigai/agentsdk/tools.py ================================================================================ """ Tool integration utilities for AgentSDK. Supports: - Custom Python functions via @tool decorator - OpenAPI specifications - MCP (Model Context Protocol) servers """ from typing import Any, Callable, Dict, List, Optional, Union from functools import wraps import inspect class FunctionTool: """Wrapper for a Python function as an agent tool.""" def __init__( self, func: Callable, name: Optional[str] = None, description: Optional[str] = None ): self.func = func self.name = name or func.__name__ self.description = description or (func.__doc__ or "").strip() self._schema = self._extract_schema() def _extract_schema(self) -> Dict[str, Any]: """Extract parameter schema from function signature.""" sig = inspect.signature(self.func) parameters = {} required = [] for param_name, param in sig.parameters.items(): if param_name == "self": continue param_schema = {"type": "string"} # Default type # Extract type from annotation if param.annotation != inspect.Parameter.empty: annotation = param.annotation if annotation == str: param_schema["type"] = "string" elif annotation == int: param_schema["type"] = "integer" elif annotation == float: param_schema["type"] = "number" elif annotation == bool: param_schema["type"] = "boolean" parameters[param_name] = param_schema # Mark as required if no default if param.default == inspect.Parameter.empty: required.append(param_name) return { "type": "function", "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": parameters, "required": required } } def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format for API.""" return self._schema def __call__(self, *args, **kwargs): """Make the tool callable.""" return self.func(*args, **kwargs) def tool( func: Optional[Callable] = None, *, name: Optional[str] = None, description: Optional[str] = None ) -> Union[FunctionTool, Callable[[Callable], FunctionTool]]: """ Decorator to convert a Python function into an agent tool. Usage: @tool def search_web(query: str) -> str: '''Search the web for information.''' # Implementation return results # Or with custom name/description @tool(name="web_search", description="Search the internet") def my_search(query: str) -> str: return results """ def decorator(fn: Callable) -> FunctionTool: return FunctionTool(fn, name=name, description=description) if func is None: return decorator else: return decorator(func) class OpenAPITool: """Tool from an OpenAPI specification.""" def __init__(self, spec_url: str, operation_id: Optional[str] = None): self.spec_url = spec_url self.operation_id = operation_id def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format for API.""" return { "type": "openapi", "spec_url": self.spec_url, "operation_id": self.operation_id } class MCPTool: """Tool from an MCP (Model Context Protocol) server.""" def __init__(self, server_name: str, tool_name: str): self.server_name = server_name self.tool_name = tool_name def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format for API.""" return { "type": "mcp", "server_name": self.server_name, "tool_name": self.tool_name } def normalize_tool(tool_spec: Any) -> Dict[str, Any]: """ Normalize a tool specification to API format. Supports: - FunctionTool objects - @tool decorated functions - OpenAPITool objects - MCPTool objects - String references to built-in tools - Raw dictionaries """ if isinstance(tool_spec, FunctionTool): return tool_spec.to_dict() if isinstance(tool_spec, OpenAPITool): return tool_spec.to_dict() if isinstance(tool_spec, MCPTool): return tool_spec.to_dict() if isinstance(tool_spec, str): # Built-in tool reference return { "type": "builtin", "name": tool_spec } if isinstance(tool_spec, dict): return tool_spec if callable(tool_spec): # Wrap bare function return FunctionTool(tool_spec).to_dict() raise ValueError(f"Invalid tool specification: {type(tool_spec)}") def normalize_tools(tools: List[Any]) -> List[Dict[str, Any]]: """Normalize a list of tool specifications.""" return [normalize_tool(t) for t in tools] ================================================================================ File: hikigai/agentsdk/telemetry.py ================================================================================ """ OpenTelemetry instrumentation for the Hikigai AgentSDK. Provides automatic tracing for agent deployments, invocations, and lifecycle operations. When the OpenTelemetry SDK packages are installed, spans are emitted for every client call. When the packages are absent the module degrades gracefully to no-op stubs so the SDK never fails to import. """ import logging from typing import Optional logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Graceful import: work with or without opentelemetry installed # --------------------------------------------------------------------------- try: from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME _HAS_OTEL = True except ImportError: _HAS_OTEL = False trace = None # type: ignore[assignment] # Module-level tracer (initialised lazily via ``init_telemetry``) _tracer: Optional[object] = None def init_telemetry( service_name: str = "hikigai-agentsdk", endpoint: Optional[str] = None, ) -> None: """ Initialise OpenTelemetry tracing for the AgentSDK. Args: service_name: The logical service name reported in traces. endpoint: Optional OTLP collector endpoint. When *None* a ``ConsoleSpanExporter`` is used (useful during development). """ global _tracer if not _HAS_OTEL: logger.debug( "OpenTelemetry SDK not installed – telemetry disabled. " "Install with: pip install opentelemetry-api opentelemetry-sdk" ) return resource = Resource.create({SERVICE_NAME: service_name}) provider = TracerProvider(resource=resource) if endpoint: try: from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter, ) provider.add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint)) ) logger.info("OpenTelemetry OTLP exporter initialised: %s", endpoint) except ImportError: logger.warning( "opentelemetry-exporter-otlp not installed – falling back to console exporter" ) provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) else: provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) logger.debug("OpenTelemetry console exporter initialised (dev mode)") trace.set_tracer_provider(provider) _tracer = trace.get_tracer(__name__) logger.info("OpenTelemetry tracing initialised for %s", service_name) def get_tracer(): """Return the module-level tracer, or *None* if telemetry is not active.""" return _tracer