API Reference

Complete API documentation for Agent OS v0.1.0

KernelSpace

The KernelSpace class is the core orchestrator of Agent OS. It manages agent registration, execution, policy enforcement, and signal dispatch. All agent interactions flow through the kernel.

Constructor

from agent_os import KernelSpace

kernel = KernelSpace(
    policy: str | Policy = "strict",
    flight_recorder: FlightRecorder | None = None,
    signal_dispatcher: SignalDispatcher | None = None,
    max_concurrent_agents: int = 100,
    timeout: float = 300.0,
    debug: bool = False
)
Parameter Type Default Description
policy str | Policy "strict" Policy name ("strict", "permissive", "custom") or Policy instance
flight_recorder FlightRecorder | None None Custom flight recorder for audit logging
signal_dispatcher SignalDispatcher | None None Custom signal dispatcher instance
max_concurrent_agents int 100 Maximum agents running simultaneously
timeout float 300.0 Default execution timeout in seconds
debug bool False Enable debug logging

kernel.register()

Decorator that registers an agent function with the kernel. The kernel wraps the agent to intercept all actions and enforce policies.

@kernel.register(
    name: str | None = None,
    permissions: list[str] | None = None,
    rate_limit: int | None = None,
    priority: int = 0
)
async def my_agent(task: str) -> str:
    """Your agent implementation."""
    return f"Completed: {task}"
Parameter Type Description
name str | None Custom agent name (defaults to function name)
permissions list[str] | None List of granted permissions: ["read", "write", "execute", "network"]
rate_limit int | None Maximum executions per minute
priority int Execution priority (higher = more priority)

Example: Register with permissions

from agent_os import KernelSpace

kernel = KernelSpace(policy="strict")

@kernel.register(
    name="data_processor",
    permissions=["read", "write"],
    rate_limit=60
)
async def data_processor(input_data: dict) -> dict:
    """Process data with read/write permissions."""
    # Kernel enforces: only read/write actions allowed
    # Network calls would be blocked
    result = await transform(input_data)
    return {"status": "success", "data": result}

kernel.execute()

Executes a registered agent with full kernel supervision. All actions are intercepted, validated against policy, and logged to the flight recorder.

result = await kernel.execute(
    agent: Callable,
    *args,
    timeout: float | None = None,
    context: dict | None = None,
    **kwargs
) -> Any
Parameter Type Description
agent Callable The registered agent function to execute
*args Any Positional arguments passed to the agent
timeout float | None Override default timeout for this execution
context dict | None Additional context (user_id, session_id, etc.)
**kwargs Any Keyword arguments passed to the agent

Returns: The agent's return value, or raises PolicyViolationError if blocked.

import asyncio
from agent_os import KernelSpace

kernel = KernelSpace(policy="strict")

@kernel.register
async def analyzer(data: str) -> dict:
    return {"analyzed": True, "length": len(data)}

# Execute with context
result = asyncio.run(
    kernel.execute(
        analyzer,
        "sample data",
        context={"user_id": "user_123", "session_id": "sess_456"}
    )
)
print(result)  # {"analyzed": True, "length": 11}

kernel.spawn()

Spawns a new child agent process. Child agents inherit parent policies but can have additional restrictions. Useful for parallel task execution.

child_id = await kernel.spawn(
    agent: Callable,
    *args,
    parent_id: str | None = None,
    inherit_permissions: bool = True,
    additional_restrictions: list[str] | None = None,
    **kwargs
) -> str
Parameter Type Description
agent Callable The agent function to spawn
parent_id str | None Parent agent ID for hierarchical tracking
inherit_permissions bool Whether child inherits parent's permissions
additional_restrictions list[str] | None Extra restrictions beyond parent's policy

Returns: Unique child agent ID (UUID string)

@kernel.register
async def coordinator(tasks: list[str]) -> list[dict]:
    """Spawn child agents for parallel processing."""
    child_ids = []
    
    for task in tasks:
        child_id = await kernel.spawn(
            worker_agent,
            task,
            parent_id=kernel.current_agent_id,
            additional_restrictions=["no_network"]
        )
        child_ids.append(child_id)
    
    # Wait for all children to complete
    results = await kernel.wait_all(child_ids)
    return results

@kernel.register
async def worker_agent(task: str) -> dict:
    """Process individual task."""
    return {"task": task, "status": "complete"}

kernel.send_signal()

Send a POSIX-style signal to a running agent.

await kernel.send_signal(
    agent_id: str,
    signal: AgentSignal
) -> bool

kernel.get_agent_status()

Retrieve the current status of a registered agent.

status = kernel.get_agent_status(agent_id: str) -> AgentStatus

# AgentStatus fields:
# - id: str
# - name: str
# - state: "running" | "stopped" | "terminated" | "waiting"
# - started_at: datetime
# - actions_count: int
# - violations_count: int

SignalDispatcher

The SignalDispatcher handles POSIX-style signals for agent lifecycle control. Signals provide deterministic control over agent execution.

Constructor

from agent_os import SignalDispatcher

dispatcher = SignalDispatcher(
    kernel: KernelSpace,
    queue_size: int = 1000,
    async_delivery: bool = True
)

dispatcher.send()

Send a signal to an agent. Non-catchable signals (SIGKILL) are executed immediately.

from agent_os import SignalDispatcher, AgentSignal

# Send SIGSTOP to pause agent
await dispatcher.send(
    agent_id="agent_123",
    signal=AgentSignal.SIGSTOP
)

# Send SIGCONT to resume
await dispatcher.send(
    agent_id="agent_123", 
    signal=AgentSignal.SIGCONT
)

# Terminate with SIGKILL (non-catchable)
await dispatcher.send(
    agent_id="agent_123",
    signal=AgentSignal.SIGKILL
)

dispatcher.register_handler()

Register a custom handler for catchable signals (SIGTERM, SIGUSR1, SIGUSR2).

@dispatcher.register_handler(AgentSignal.SIGTERM)
async def graceful_shutdown(agent_id: str, signal: AgentSignal):
    """Custom handler for graceful shutdown."""
    agent = kernel.get_agent(agent_id)
    await agent.cleanup()
    await agent.save_state()
    return True  # Allow termination

@dispatcher.register_handler(AgentSignal.SIGUSR1)
async def dump_state(agent_id: str, signal: AgentSignal):
    """Custom handler for state dumping."""
    agent = kernel.get_agent(agent_id)
    state = await agent.get_state()
    await flight_recorder.log_state_dump(agent_id, state)

AgentSignal Enum

POSIX-inspired signals for agent control. Some signals are catchable (handlers can intercept), while others are non-catchable (kernel enforces immediately).

from agent_os import AgentSignal

class AgentSignal(Enum):
    # Non-catchable signals (kernel enforced)
    SIGKILL = 9    # Immediate termination, no cleanup
    SIGSTOP = 19   # Immediate pause, cannot be caught
    
    # Catchable signals (handlers can intercept)
    SIGCONT = 18   # Resume from SIGSTOP
    SIGTERM = 15   # Graceful termination request
    SIGUSR1 = 10   # User-defined signal 1
    SIGUSR2 = 12   # User-defined signal 2
    SIGALRM = 14   # Timeout alarm
Signal Value Catchable Description
SIGKILL 9 ❌ No Immediate termination. Agent cannot catch or ignore.
SIGSTOP 19 ❌ No Immediate pause. Agent freezes until SIGCONT.
SIGCONT 18 ✅ Yes Resume execution after SIGSTOP.
SIGTERM 15 ✅ Yes Graceful termination. Agent can cleanup first.
SIGUSR1 10 ✅ Yes User-defined. Common use: state dump.
SIGUSR2 12 ✅ Yes User-defined. Common use: config reload.
SIGALRM 14 ✅ Yes Timer expired. Used for timeout enforcement.

Example: Signal-based agent control

from agent_os import KernelSpace, AgentSignal
import asyncio

kernel = KernelSpace(policy="strict")

@kernel.register
async def long_running_agent(iterations: int):
    """Agent that can be paused and resumed."""
    for i in range(iterations):
        # Check for pause signal
        if kernel.is_stopped():
            await kernel.wait_for_continue()
        
        await process_iteration(i)
        await asyncio.sleep(1)
    
    return {"completed": iterations}

async def main():
    # Start agent
    task = asyncio.create_task(
        kernel.execute(long_running_agent, 100)
    )
    
    # Pause after 5 seconds
    await asyncio.sleep(5)
    await kernel.send_signal(kernel.current_agent_id, AgentSignal.SIGSTOP)
    print("Agent paused")
    
    # Resume after 3 seconds
    await asyncio.sleep(3)
    await kernel.send_signal(kernel.current_agent_id, AgentSignal.SIGCONT)
    print("Agent resumed")
    
    result = await task
    print(result)

Policy

The Policy class defines rules that govern agent behavior. Policies are enforced at the kernel level before any action executes.

Constructor

from agent_os import Policy, Pattern

policy = Policy(
    name: str,
    version: str = "1.0.0",
    rules: list[Rule] | None = None,
    default_action: str = "deny",  # "allow" | "deny"
    inheritance: str = "strict"    # "strict" | "permissive" | "none"
)

policy.add_rule()

Add a rule to the policy. Rules are evaluated in order; first match wins.

from agent_os import Policy, Pattern

policy = Policy(name="healthcare_policy", default_action="deny")

# Allow read operations on non-sensitive data
policy.add_rule(
    action="allow",
    pattern=Pattern(
        operation="read",
        resource="data/*",
        exclude=["data/phi/*", "data/pii/*"]
    ),
    reason="Allow reading non-sensitive data"
)

# Deny all write operations to PHI
policy.add_rule(
    action="deny",
    pattern=Pattern(
        operation="write",
        resource="data/phi/*"
    ),
    reason="PHI write requires explicit approval",
    on_violation="SIGKILL"  # Immediate termination
)

# Allow network calls to approved endpoints only
policy.add_rule(
    action="allow",
    pattern=Pattern(
        operation="network",
        resource="https://api.approved-service.com/*"
    ),
    rate_limit=100  # Max 100 calls per minute
)

Built-in Policies

Policy Name Default Action Description
"strict" deny Deny everything not explicitly allowed. Production recommended.
"permissive" allow Allow everything not explicitly denied. Development only.
"audit" allow Allow all but log violations. Useful for policy development.

Pattern

Pattern defines matching criteria for policy rules. Supports glob patterns, regex, and structured matching.

from agent_os import Pattern

# Simple glob pattern
pattern = Pattern(
    operation="read",
    resource="files/*.txt"
)

# Regex pattern
pattern = Pattern(
    operation="write",
    resource=r"db/users/\d+",
    match_type="regex"
)

# Structured pattern with conditions
pattern = Pattern(
    operation="execute",
    resource="tools/*",
    conditions={
        "user_role": ["admin", "operator"],
        "time_window": {"start": "09:00", "end": "17:00"},
        "max_cost": 100.0
    }
)

# Exclude specific resources
pattern = Pattern(
    operation="*",
    resource="api/*",
    exclude=["api/admin/*", "api/internal/*"]
)
Parameter Type Description
operation str Operation type: "read", "write", "execute", "network", "*"
resource str Resource path pattern (glob or regex)
match_type str "glob" (default) or "regex"
conditions dict | None Additional matching conditions
exclude list[str] | None Patterns to exclude from match

FlightRecorder

The FlightRecorder provides comprehensive audit logging for all agent activities. Like an airplane's black box, it captures everything for debugging and compliance.

Constructor

from agent_os import FlightRecorder

recorder = FlightRecorder(
    storage: str = "memory",  # "memory" | "sqlite" | "postgresql" | "s3"
    connection_string: str | None = None,
    retention_days: int = 90,
    compression: bool = True,
    encryption_key: bytes | None = None
)

recorder.log()

Log an event. Called automatically by the kernel for all actions.

await recorder.log(
    event_type: str,
    agent_id: str,
    data: dict,
    severity: str = "info"  # "debug" | "info" | "warning" | "error" | "critical"
)

# Example
await recorder.log(
    event_type="action_blocked",
    agent_id="agent_123",
    data={
        "action": "write",
        "resource": "data/phi/patient_001.json",
        "policy_rule": "deny_phi_write",
        "reason": "PHI access denied without explicit approval"
    },
    severity="warning"
)

recorder.query()

Query recorded events with filters.

from datetime import datetime, timedelta

# Query all violations in the last hour
violations = await recorder.query(
    event_type="policy_violation",
    start_time=datetime.now() - timedelta(hours=1),
    end_time=datetime.now(),
    limit=100
)

# Query specific agent activity
activity = await recorder.query(
    agent_id="agent_123",
    severity=["warning", "error", "critical"],
    limit=50
)

# Query with custom filters
events = await recorder.query(
    filters={
        "data.resource": {"$regex": "data/phi/*"},
        "data.action": "write"
    }
)

recorder.replay()

Replay events for debugging (time-travel debugging).

# Get execution timeline
timeline = await recorder.get_timeline(
    agent_id="agent_123",
    execution_id="exec_456"
)

# Replay to specific point
state = await recorder.replay(
    agent_id="agent_123",
    execution_id="exec_456",
    to_event_id="evt_789"  # Replay up to this event
)

print(f"Agent state at event: {state}")

recorder.export()

Export audit logs for compliance reporting.

# Export to JSON
await recorder.export(
    format="json",
    output_path="audit_report.json",
    start_time=datetime(2024, 1, 1),
    end_time=datetime(2024, 1, 31),
    include_metadata=True
)

# Export to CSV for analysis
await recorder.export(
    format="csv",
    output_path="audit_report.csv",
    fields=["timestamp", "agent_id", "event_type", "severity"]
)

CMVK (Cross-Model Verification Kernel)

The CMVK module detects hallucinations by comparing outputs across multiple LLMs. If models disagree beyond a threshold, the output is flagged or blocked.

from agent_os.modules import CMVK

cmvk = CMVK(
    models=["gpt-4", "claude-3", "gemini-pro"],
    consensus_threshold=0.8,  # 80% agreement required
    voting_strategy="majority",  # "majority" | "unanimous" | "weighted"
    timeout=30.0
)

# Verify a response
result = await cmvk.verify(
    prompt="What is the capital of France?",
    response="Paris is the capital of France.",
    context={"domain": "geography"}
)

print(result)
# {
#     "verified": True,
#     "confidence": 0.95,
#     "model_votes": {"gpt-4": True, "claude-3": True, "gemini-pro": True},
#     "consensus": "unanimous"
# }

# Integrate with kernel
kernel = KernelSpace(policy="strict")
kernel.add_module(cmvk)

@kernel.register
async def verified_agent(query: str):
    response = await llm.generate(query)
    
    # CMVK automatically verifies before returning
    return response  # Blocked if verification fails

EMK (Episodic Memory Kernel)

The EMK provides immutable, searchable memory for agents. Supports semantic search, temporal queries, and memory isolation between agents.

from agent_os.modules import EMK

emk = EMK(
    storage="postgresql",
    connection_string="postgresql://localhost/agent_memory",
    embedding_model="text-embedding-3-small",
    isolation="agent"  # "agent" | "session" | "global"
)

# Store a memory
await emk.store(
    agent_id="agent_123",
    content="User prefers dark mode and metric units.",
    metadata={"type": "preference", "confidence": 0.9},
    tags=["user_preference", "ui"]
)

# Semantic search
memories = await emk.search(
    agent_id="agent_123",
    query="What display settings does the user prefer?",
    limit=5,
    min_relevance=0.7
)

# Temporal query
recent = await emk.query(
    agent_id="agent_123",
    since=datetime.now() - timedelta(hours=24),
    tags=["user_preference"]
)

# Memory is immutable - create new version instead of updating
await emk.store(
    agent_id="agent_123",
    content="User now prefers light mode.",
    supersedes="memory_456",  # Reference to previous memory
    metadata={"type": "preference", "confidence": 0.95}
)

IATP (Inter-Agent Trust Protocol)

The IATP module provides cryptographic identity and message signing for multi-agent systems. Ensures agents can verify each other's identity and message integrity.

from agent_os.modules import IATP

iatp = IATP(
    key_storage="vault",  # "memory" | "file" | "vault" | "hsm"
    algorithm="ed25519",
    trust_model="web"  # "hierarchical" | "web" | "pki"
)

# Register agent identity
identity = await iatp.register_agent(
    agent_id="agent_123",
    metadata={"role": "data_processor", "department": "analytics"}
)

print(identity.public_key)  # Base64 encoded public key

# Sign a message
signed_message = await iatp.sign(
    sender_id="agent_123",
    message={"action": "process", "data_id": "dataset_456"},
    recipient_id="agent_789"
)

# Verify a received message
verification = await iatp.verify(signed_message)
print(verification)
# {
#     "valid": True,
#     "sender": "agent_123",
#     "timestamp": "2024-01-15T10:30:00Z",
#     "trust_level": 0.95
# }

# Establish trust relationship
await iatp.establish_trust(
    from_agent="agent_123",
    to_agent="agent_789",
    trust_level=0.9,
    permissions=["send_data", "request_action"]
)

AMB (Agent Message Bus)

The AMB module provides decoupled communication between agents. Supports multiple backends and message patterns (pub/sub, request/reply, streaming).

from agent_os.modules import AMB

amb = AMB(
    backend="redis",  # "memory" | "redis" | "kafka" | "nats" | "sqs"
    connection_string="redis://localhost:6379",
    serializer="msgpack"  # "json" | "msgpack" | "protobuf"
)

# Publish a message
await amb.publish(
    topic="tasks.process",
    message={"task_id": "task_123", "priority": "high"},
    sender_id="coordinator"
)

# Subscribe to messages
@amb.subscribe("tasks.process")
async def handle_task(message, metadata):
    """Process incoming tasks."""
    task_id = message["task_id"]
    result = await process_task(task_id)
    
    # Reply to sender
    await amb.reply(
        metadata.reply_to,
        {"task_id": task_id, "status": "completed", "result": result}
    )

# Request/Reply pattern
response = await amb.request(
    topic="agents.status",
    message={"agent_id": "agent_123"},
    timeout=5.0
)

# Streaming
async for message in amb.stream("events.realtime"):
    await process_event(message)

# Message routing with filters
@amb.subscribe(
    "tasks.*",
    filter={"priority": "high", "department": "analytics"}
)
async def handle_priority_tasks(message, metadata):
    await fast_track_process(message)

Additional Resources