API Reference
Complete API documentation for Agent OS v0.1.0
KernelSpace
The KernelSpace class is the core orchestrator of Agent OS. It manages agent registration,
execution, policy enforcement, and signal dispatch. All agent interactions flow through the kernel.
Constructor
from agent_os import KernelSpace
kernel = KernelSpace(
policy: str | Policy = "strict",
flight_recorder: FlightRecorder | None = None,
signal_dispatcher: SignalDispatcher | None = None,
max_concurrent_agents: int = 100,
timeout: float = 300.0,
debug: bool = False
)
| Parameter | Type | Default | Description |
|---|---|---|---|
policy |
str | Policy |
"strict" |
Policy name ("strict", "permissive", "custom") or Policy instance |
flight_recorder |
FlightRecorder | None |
None |
Custom flight recorder for audit logging |
signal_dispatcher |
SignalDispatcher | None |
None |
Custom signal dispatcher instance |
max_concurrent_agents |
int |
100 |
Maximum agents running simultaneously |
timeout |
float |
300.0 |
Default execution timeout in seconds |
debug |
bool |
False |
Enable debug logging |
kernel.register()
Decorator that registers an agent function with the kernel. The kernel wraps the agent to intercept all actions and enforce policies.
@kernel.register(
name: str | None = None,
permissions: list[str] | None = None,
rate_limit: int | None = None,
priority: int = 0
)
async def my_agent(task: str) -> str:
"""Your agent implementation."""
return f"Completed: {task}"
| Parameter | Type | Description |
|---|---|---|
name |
str | None |
Custom agent name (defaults to function name) |
permissions |
list[str] | None |
List of granted permissions: ["read", "write", "execute", "network"] |
rate_limit |
int | None |
Maximum executions per minute |
priority |
int |
Execution priority (higher = more priority) |
Example: Register with permissions
from agent_os import KernelSpace
kernel = KernelSpace(policy="strict")
@kernel.register(
name="data_processor",
permissions=["read", "write"],
rate_limit=60
)
async def data_processor(input_data: dict) -> dict:
"""Process data with read/write permissions."""
# Kernel enforces: only read/write actions allowed
# Network calls would be blocked
result = await transform(input_data)
return {"status": "success", "data": result}
kernel.execute()
Executes a registered agent with full kernel supervision. All actions are intercepted, validated against policy, and logged to the flight recorder.
result = await kernel.execute(
agent: Callable,
*args,
timeout: float | None = None,
context: dict | None = None,
**kwargs
) -> Any
| Parameter | Type | Description |
|---|---|---|
agent |
Callable |
The registered agent function to execute |
*args |
Any |
Positional arguments passed to the agent |
timeout |
float | None |
Override default timeout for this execution |
context |
dict | None |
Additional context (user_id, session_id, etc.) |
**kwargs |
Any |
Keyword arguments passed to the agent |
Returns: The agent's return value, or raises PolicyViolationError if blocked.
import asyncio
from agent_os import KernelSpace
kernel = KernelSpace(policy="strict")
@kernel.register
async def analyzer(data: str) -> dict:
return {"analyzed": True, "length": len(data)}
# Execute with context
result = asyncio.run(
kernel.execute(
analyzer,
"sample data",
context={"user_id": "user_123", "session_id": "sess_456"}
)
)
print(result) # {"analyzed": True, "length": 11}
kernel.spawn()
Spawns a new child agent process. Child agents inherit parent policies but can have additional restrictions. Useful for parallel task execution.
child_id = await kernel.spawn(
agent: Callable,
*args,
parent_id: str | None = None,
inherit_permissions: bool = True,
additional_restrictions: list[str] | None = None,
**kwargs
) -> str
| Parameter | Type | Description |
|---|---|---|
agent |
Callable |
The agent function to spawn |
parent_id |
str | None |
Parent agent ID for hierarchical tracking |
inherit_permissions |
bool |
Whether child inherits parent's permissions |
additional_restrictions |
list[str] | None |
Extra restrictions beyond parent's policy |
Returns: Unique child agent ID (UUID string)
@kernel.register
async def coordinator(tasks: list[str]) -> list[dict]:
"""Spawn child agents for parallel processing."""
child_ids = []
for task in tasks:
child_id = await kernel.spawn(
worker_agent,
task,
parent_id=kernel.current_agent_id,
additional_restrictions=["no_network"]
)
child_ids.append(child_id)
# Wait for all children to complete
results = await kernel.wait_all(child_ids)
return results
@kernel.register
async def worker_agent(task: str) -> dict:
"""Process individual task."""
return {"task": task, "status": "complete"}
kernel.send_signal()
Send a POSIX-style signal to a running agent.
await kernel.send_signal(
agent_id: str,
signal: AgentSignal
) -> bool
kernel.get_agent_status()
Retrieve the current status of a registered agent.
status = kernel.get_agent_status(agent_id: str) -> AgentStatus
# AgentStatus fields:
# - id: str
# - name: str
# - state: "running" | "stopped" | "terminated" | "waiting"
# - started_at: datetime
# - actions_count: int
# - violations_count: int
SignalDispatcher
The SignalDispatcher handles POSIX-style signals for agent lifecycle control.
Signals provide deterministic control over agent execution.
Constructor
from agent_os import SignalDispatcher
dispatcher = SignalDispatcher(
kernel: KernelSpace,
queue_size: int = 1000,
async_delivery: bool = True
)
dispatcher.send()
Send a signal to an agent. Non-catchable signals (SIGKILL) are executed immediately.
from agent_os import SignalDispatcher, AgentSignal
# Send SIGSTOP to pause agent
await dispatcher.send(
agent_id="agent_123",
signal=AgentSignal.SIGSTOP
)
# Send SIGCONT to resume
await dispatcher.send(
agent_id="agent_123",
signal=AgentSignal.SIGCONT
)
# Terminate with SIGKILL (non-catchable)
await dispatcher.send(
agent_id="agent_123",
signal=AgentSignal.SIGKILL
)
dispatcher.register_handler()
Register a custom handler for catchable signals (SIGTERM, SIGUSR1, SIGUSR2).
@dispatcher.register_handler(AgentSignal.SIGTERM)
async def graceful_shutdown(agent_id: str, signal: AgentSignal):
"""Custom handler for graceful shutdown."""
agent = kernel.get_agent(agent_id)
await agent.cleanup()
await agent.save_state()
return True # Allow termination
@dispatcher.register_handler(AgentSignal.SIGUSR1)
async def dump_state(agent_id: str, signal: AgentSignal):
"""Custom handler for state dumping."""
agent = kernel.get_agent(agent_id)
state = await agent.get_state()
await flight_recorder.log_state_dump(agent_id, state)
AgentSignal Enum
POSIX-inspired signals for agent control. Some signals are catchable (handlers can intercept), while others are non-catchable (kernel enforces immediately).
from agent_os import AgentSignal
class AgentSignal(Enum):
# Non-catchable signals (kernel enforced)
SIGKILL = 9 # Immediate termination, no cleanup
SIGSTOP = 19 # Immediate pause, cannot be caught
# Catchable signals (handlers can intercept)
SIGCONT = 18 # Resume from SIGSTOP
SIGTERM = 15 # Graceful termination request
SIGUSR1 = 10 # User-defined signal 1
SIGUSR2 = 12 # User-defined signal 2
SIGALRM = 14 # Timeout alarm
| Signal | Value | Catchable | Description |
|---|---|---|---|
SIGKILL |
9 | ❌ No | Immediate termination. Agent cannot catch or ignore. |
SIGSTOP |
19 | ❌ No | Immediate pause. Agent freezes until SIGCONT. |
SIGCONT |
18 | ✅ Yes | Resume execution after SIGSTOP. |
SIGTERM |
15 | ✅ Yes | Graceful termination. Agent can cleanup first. |
SIGUSR1 |
10 | ✅ Yes | User-defined. Common use: state dump. |
SIGUSR2 |
12 | ✅ Yes | User-defined. Common use: config reload. |
SIGALRM |
14 | ✅ Yes | Timer expired. Used for timeout enforcement. |
Example: Signal-based agent control
from agent_os import KernelSpace, AgentSignal
import asyncio
kernel = KernelSpace(policy="strict")
@kernel.register
async def long_running_agent(iterations: int):
"""Agent that can be paused and resumed."""
for i in range(iterations):
# Check for pause signal
if kernel.is_stopped():
await kernel.wait_for_continue()
await process_iteration(i)
await asyncio.sleep(1)
return {"completed": iterations}
async def main():
# Start agent
task = asyncio.create_task(
kernel.execute(long_running_agent, 100)
)
# Pause after 5 seconds
await asyncio.sleep(5)
await kernel.send_signal(kernel.current_agent_id, AgentSignal.SIGSTOP)
print("Agent paused")
# Resume after 3 seconds
await asyncio.sleep(3)
await kernel.send_signal(kernel.current_agent_id, AgentSignal.SIGCONT)
print("Agent resumed")
result = await task
print(result)
Policy
The Policy class defines rules that govern agent behavior. Policies are enforced
at the kernel level before any action executes.
Constructor
from agent_os import Policy, Pattern
policy = Policy(
name: str,
version: str = "1.0.0",
rules: list[Rule] | None = None,
default_action: str = "deny", # "allow" | "deny"
inheritance: str = "strict" # "strict" | "permissive" | "none"
)
policy.add_rule()
Add a rule to the policy. Rules are evaluated in order; first match wins.
from agent_os import Policy, Pattern
policy = Policy(name="healthcare_policy", default_action="deny")
# Allow read operations on non-sensitive data
policy.add_rule(
action="allow",
pattern=Pattern(
operation="read",
resource="data/*",
exclude=["data/phi/*", "data/pii/*"]
),
reason="Allow reading non-sensitive data"
)
# Deny all write operations to PHI
policy.add_rule(
action="deny",
pattern=Pattern(
operation="write",
resource="data/phi/*"
),
reason="PHI write requires explicit approval",
on_violation="SIGKILL" # Immediate termination
)
# Allow network calls to approved endpoints only
policy.add_rule(
action="allow",
pattern=Pattern(
operation="network",
resource="https://api.approved-service.com/*"
),
rate_limit=100 # Max 100 calls per minute
)
Built-in Policies
| Policy Name | Default Action | Description |
|---|---|---|
"strict" |
deny | Deny everything not explicitly allowed. Production recommended. |
"permissive" |
allow | Allow everything not explicitly denied. Development only. |
"audit" |
allow | Allow all but log violations. Useful for policy development. |
Pattern
Pattern defines matching criteria for policy rules. Supports glob patterns,
regex, and structured matching.
from agent_os import Pattern
# Simple glob pattern
pattern = Pattern(
operation="read",
resource="files/*.txt"
)
# Regex pattern
pattern = Pattern(
operation="write",
resource=r"db/users/\d+",
match_type="regex"
)
# Structured pattern with conditions
pattern = Pattern(
operation="execute",
resource="tools/*",
conditions={
"user_role": ["admin", "operator"],
"time_window": {"start": "09:00", "end": "17:00"},
"max_cost": 100.0
}
)
# Exclude specific resources
pattern = Pattern(
operation="*",
resource="api/*",
exclude=["api/admin/*", "api/internal/*"]
)
| Parameter | Type | Description |
|---|---|---|
operation |
str |
Operation type: "read", "write", "execute", "network", "*" |
resource |
str |
Resource path pattern (glob or regex) |
match_type |
str |
"glob" (default) or "regex" |
conditions |
dict | None |
Additional matching conditions |
exclude |
list[str] | None |
Patterns to exclude from match |
FlightRecorder
The FlightRecorder provides comprehensive audit logging for all agent activities.
Like an airplane's black box, it captures everything for debugging and compliance.
Constructor
from agent_os import FlightRecorder
recorder = FlightRecorder(
storage: str = "memory", # "memory" | "sqlite" | "postgresql" | "s3"
connection_string: str | None = None,
retention_days: int = 90,
compression: bool = True,
encryption_key: bytes | None = None
)
recorder.log()
Log an event. Called automatically by the kernel for all actions.
await recorder.log(
event_type: str,
agent_id: str,
data: dict,
severity: str = "info" # "debug" | "info" | "warning" | "error" | "critical"
)
# Example
await recorder.log(
event_type="action_blocked",
agent_id="agent_123",
data={
"action": "write",
"resource": "data/phi/patient_001.json",
"policy_rule": "deny_phi_write",
"reason": "PHI access denied without explicit approval"
},
severity="warning"
)
recorder.query()
Query recorded events with filters.
from datetime import datetime, timedelta
# Query all violations in the last hour
violations = await recorder.query(
event_type="policy_violation",
start_time=datetime.now() - timedelta(hours=1),
end_time=datetime.now(),
limit=100
)
# Query specific agent activity
activity = await recorder.query(
agent_id="agent_123",
severity=["warning", "error", "critical"],
limit=50
)
# Query with custom filters
events = await recorder.query(
filters={
"data.resource": {"$regex": "data/phi/*"},
"data.action": "write"
}
)
recorder.replay()
Replay events for debugging (time-travel debugging).
# Get execution timeline
timeline = await recorder.get_timeline(
agent_id="agent_123",
execution_id="exec_456"
)
# Replay to specific point
state = await recorder.replay(
agent_id="agent_123",
execution_id="exec_456",
to_event_id="evt_789" # Replay up to this event
)
print(f"Agent state at event: {state}")
recorder.export()
Export audit logs for compliance reporting.
# Export to JSON
await recorder.export(
format="json",
output_path="audit_report.json",
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 31),
include_metadata=True
)
# Export to CSV for analysis
await recorder.export(
format="csv",
output_path="audit_report.csv",
fields=["timestamp", "agent_id", "event_type", "severity"]
)
CMVK (Cross-Model Verification Kernel)
The CMVK module detects hallucinations by comparing outputs across multiple LLMs. If models disagree beyond a threshold, the output is flagged or blocked.
from agent_os.modules import CMVK
cmvk = CMVK(
models=["gpt-4", "claude-3", "gemini-pro"],
consensus_threshold=0.8, # 80% agreement required
voting_strategy="majority", # "majority" | "unanimous" | "weighted"
timeout=30.0
)
# Verify a response
result = await cmvk.verify(
prompt="What is the capital of France?",
response="Paris is the capital of France.",
context={"domain": "geography"}
)
print(result)
# {
# "verified": True,
# "confidence": 0.95,
# "model_votes": {"gpt-4": True, "claude-3": True, "gemini-pro": True},
# "consensus": "unanimous"
# }
# Integrate with kernel
kernel = KernelSpace(policy="strict")
kernel.add_module(cmvk)
@kernel.register
async def verified_agent(query: str):
response = await llm.generate(query)
# CMVK automatically verifies before returning
return response # Blocked if verification fails
EMK (Episodic Memory Kernel)
The EMK provides immutable, searchable memory for agents. Supports semantic search, temporal queries, and memory isolation between agents.
from agent_os.modules import EMK
emk = EMK(
storage="postgresql",
connection_string="postgresql://localhost/agent_memory",
embedding_model="text-embedding-3-small",
isolation="agent" # "agent" | "session" | "global"
)
# Store a memory
await emk.store(
agent_id="agent_123",
content="User prefers dark mode and metric units.",
metadata={"type": "preference", "confidence": 0.9},
tags=["user_preference", "ui"]
)
# Semantic search
memories = await emk.search(
agent_id="agent_123",
query="What display settings does the user prefer?",
limit=5,
min_relevance=0.7
)
# Temporal query
recent = await emk.query(
agent_id="agent_123",
since=datetime.now() - timedelta(hours=24),
tags=["user_preference"]
)
# Memory is immutable - create new version instead of updating
await emk.store(
agent_id="agent_123",
content="User now prefers light mode.",
supersedes="memory_456", # Reference to previous memory
metadata={"type": "preference", "confidence": 0.95}
)
IATP (Inter-Agent Trust Protocol)
The IATP module provides cryptographic identity and message signing for multi-agent systems. Ensures agents can verify each other's identity and message integrity.
from agent_os.modules import IATP
iatp = IATP(
key_storage="vault", # "memory" | "file" | "vault" | "hsm"
algorithm="ed25519",
trust_model="web" # "hierarchical" | "web" | "pki"
)
# Register agent identity
identity = await iatp.register_agent(
agent_id="agent_123",
metadata={"role": "data_processor", "department": "analytics"}
)
print(identity.public_key) # Base64 encoded public key
# Sign a message
signed_message = await iatp.sign(
sender_id="agent_123",
message={"action": "process", "data_id": "dataset_456"},
recipient_id="agent_789"
)
# Verify a received message
verification = await iatp.verify(signed_message)
print(verification)
# {
# "valid": True,
# "sender": "agent_123",
# "timestamp": "2024-01-15T10:30:00Z",
# "trust_level": 0.95
# }
# Establish trust relationship
await iatp.establish_trust(
from_agent="agent_123",
to_agent="agent_789",
trust_level=0.9,
permissions=["send_data", "request_action"]
)
AMB (Agent Message Bus)
The AMB module provides decoupled communication between agents. Supports multiple backends and message patterns (pub/sub, request/reply, streaming).
from agent_os.modules import AMB
amb = AMB(
backend="redis", # "memory" | "redis" | "kafka" | "nats" | "sqs"
connection_string="redis://localhost:6379",
serializer="msgpack" # "json" | "msgpack" | "protobuf"
)
# Publish a message
await amb.publish(
topic="tasks.process",
message={"task_id": "task_123", "priority": "high"},
sender_id="coordinator"
)
# Subscribe to messages
@amb.subscribe("tasks.process")
async def handle_task(message, metadata):
"""Process incoming tasks."""
task_id = message["task_id"]
result = await process_task(task_id)
# Reply to sender
await amb.reply(
metadata.reply_to,
{"task_id": task_id, "status": "completed", "result": result}
)
# Request/Reply pattern
response = await amb.request(
topic="agents.status",
message={"agent_id": "agent_123"},
timeout=5.0
)
# Streaming
async for message in amb.stream("events.realtime"):
await process_event(message)
# Message routing with filters
@amb.subscribe(
"tasks.*",
filter={"priority": "high", "department": "analytics"}
)
async def handle_priority_tasks(message, metadata):
await fast_track_process(message)
Additional Resources
- Quickstart Guide - Get running in 2 minutes
- Policy Reference - Complete policy configuration
- Module Deep Dives - Detailed module documentation
- Framework Integrations - LangChain, CrewAI, AutoGen
- Examples - Production-ready code samples
- GitHub Repository - Source code and issues