What is AMB?
AMB (Agent Message Bus) is a messaging infrastructure that enables decoupled communication between AI agents. Instead of agents calling each other directly, they communicate through a message bus, allowing for loose coupling, scalability, and fault tolerance in multi-agent systems.
Why Decoupled Communication?
Direct agent-to-agent calls create tight coupling and single points of failure. With AMB, agents can be added, removed, or scaled independently. Publishers don't need to know about subscribers, enabling truly modular agent architectures.
Key Features
- Publish/Subscribe — Broadcast messages to multiple interested agents
- Request/Reply — Synchronous-style communication over async messaging
- Multiple Backends — Redis, Kafka, NATS, SQS, or in-memory for testing
- Topic Wildcards — Flexible subscription patterns with * and # wildcards
- Message Serialization — JSON, MessagePack, or Protobuf encoding
- Automatic Retries — Configurable retry policies with exponential backoff
Installation
Install AMB as a standalone module or as part of the full Agent OS kernel:
# Install via the kernel with AMB extras
pip install agent-os-kernel[amb]
# Install with specific broker support
pip install agent-os-kernel[amb,redis] # Redis backend
pip install agent-os-kernel[amb,kafka] # Kafka backend
pip install agent-os-kernel[amb,nats] # NATS backend
pip install agent-os-kernel[amb,sqs] # AWS SQS backend
# Install with all broker backends
pip install agent-os-kernel[amb,all-brokers]
# Install with specific serialization support
pip install agent-os-kernel[amb,msgpack] # MessagePack serialization
pip install agent-os-kernel[amb,protobuf] # Protobuf serialization
Environment Variables
Configure broker connections via environment variables:
# .env file
AMB_BROKER=redis # redis, kafka, nats, sqs, memory
AMB_REDIS_URL=redis://localhost:6379
AMB_KAFKA_BROKERS=localhost:9092
AMB_NATS_URL=nats://localhost:4222
AMB_SQS_REGION=us-east-1
AMB_SERIALIZER=json # json, msgpack, protobuf
MessageBus Class Basics
The MessageBus is the primary interface for all messaging operations:
from agent_os.amb import MessageBus
# Create a message bus with default settings (in-memory for development)
bus = MessageBus()
# Create with explicit broker configuration
bus = MessageBus(
broker="redis",
broker_url="redis://localhost:6379",
serializer="json"
)
# Async context manager for proper resource cleanup
async with MessageBus(broker="redis") as bus:
await bus.publish("topic.name", {"data": "value"})
# Manual lifecycle management
bus = MessageBus(broker="redis")
await bus.connect()
# ... use the bus ...
await bus.disconnect()
Core Methods
| Method | Description |
|---|---|
publish(topic, message) |
Publish a message to a topic |
subscribe(topic, handler) |
Subscribe to messages on a topic |
unsubscribe(topic) |
Unsubscribe from a topic |
request(topic, message, timeout) |
Send a request and wait for reply |
reply(request_id, response) |
Send a reply to a request |
Broker Backends
AMB supports multiple message broker backends, each with different characteristics:
| Backend | Best For | Persistence | Throughput | Latency |
|---|---|---|---|---|
| Redis | General purpose, real-time | Optional (Streams) | High | Very Low |
| Kafka | Event sourcing, audit logs | Yes (durable) | Very High | Medium |
| NATS | Lightweight, edge computing | Optional (JetStream) | Very High | Ultra Low |
| SQS | AWS-native, serverless | Yes | Medium | Medium |
| InMemory | Testing, development | No | Unlimited | Zero |
Redis Backend
from agent_os.amb import MessageBus, RedisBroker
# Basic Redis configuration
bus = MessageBus(
broker="redis",
broker_url="redis://localhost:6379/0"
)
# Advanced Redis configuration with Streams for persistence
bus = MessageBus(
broker=RedisBroker(
url="redis://localhost:6379",
db=0,
use_streams=True, # Use Redis Streams for persistence
stream_max_len=10000, # Max messages per stream
consumer_group="my-agents", # Consumer group for load balancing
pool_size=10, # Connection pool size
ssl=True, # Enable SSL/TLS
password="secret"
)
)
# Redis Cluster support
bus = MessageBus(
broker=RedisBroker(
cluster_nodes=[
"redis://node1:6379",
"redis://node2:6379",
"redis://node3:6379"
]
)
)
Kafka Backend
from agent_os.amb import MessageBus, KafkaBroker
# Basic Kafka configuration
bus = MessageBus(
broker="kafka",
broker_url="localhost:9092"
)
# Advanced Kafka configuration
bus = MessageBus(
broker=KafkaBroker(
bootstrap_servers=["kafka1:9092", "kafka2:9092", "kafka3:9092"],
client_id="agent-os-producer",
group_id="agent-os-consumers",
auto_offset_reset="earliest", # "earliest" or "latest"
enable_auto_commit=True,
max_poll_records=500,
session_timeout_ms=30000,
# Security
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="user",
sasl_password="password"
)
)
# Kafka with Schema Registry (Avro/Protobuf)
bus = MessageBus(
broker=KafkaBroker(
bootstrap_servers=["kafka:9092"],
schema_registry_url="http://schema-registry:8081"
),
serializer="avro"
)
NATS Backend
from agent_os.amb import MessageBus, NatsBroker
# Basic NATS configuration
bus = MessageBus(
broker="nats",
broker_url="nats://localhost:4222"
)
# Advanced NATS configuration with JetStream persistence
bus = MessageBus(
broker=NatsBroker(
servers=["nats://nats1:4222", "nats://nats2:4222"],
name="agent-os-client",
# JetStream for persistence
jetstream=True,
stream_name="AGENTS",
stream_subjects=["agents.>"],
stream_retention="limits", # "limits", "interest", or "workqueue"
stream_max_msgs=1000000,
# Security
user="user",
password="password",
# Or use token
token="secret-token",
# Or use NKey
nkeys_seed="SUACSSL3..."
)
)
# NATS with TLS
bus = MessageBus(
broker=NatsBroker(
servers=["tls://nats:4222"],
tls_cert="/path/to/cert.pem",
tls_key="/path/to/key.pem",
tls_ca="/path/to/ca.pem"
)
)
AWS SQS Backend
from agent_os.amb import MessageBus, SQSBroker
# Basic SQS configuration (uses AWS credentials from environment)
bus = MessageBus(
broker="sqs",
broker_url="https://sqs.us-east-1.amazonaws.com"
)
# Advanced SQS configuration
bus = MessageBus(
broker=SQSBroker(
region_name="us-east-1",
queue_prefix="agent-os-", # Prefix for auto-created queues
visibility_timeout=30, # Seconds before message reappears
wait_time_seconds=20, # Long polling duration
max_number_of_messages=10, # Messages per receive call
# FIFO queue support
use_fifo=True,
content_based_deduplication=True,
# Dead letter queue
dlq_arn="arn:aws:sqs:us-east-1:123456789:agent-os-dlq",
max_receive_count=3,
# Explicit credentials (prefer IAM roles instead)
aws_access_key_id="AKIA...",
aws_secret_access_key="secret"
)
)
# SQS with SNS for fan-out (pub/sub pattern)
bus = MessageBus(
broker=SQSBroker(
region_name="us-east-1",
use_sns_fanout=True,
sns_topic_prefix="agent-os-"
)
)
InMemory Backend (Testing)
from agent_os.amb import MessageBus, InMemoryBroker
import pytest
# Default: automatically uses InMemory in test environments
bus = MessageBus() # Detects pytest and uses InMemory
# Explicit InMemory for testing
bus = MessageBus(broker="memory")
# InMemory with simulated latency for realistic testing
bus = MessageBus(
broker=InMemoryBroker(
latency_ms=10, # Simulate network latency
failure_rate=0.01, # 1% random failures for chaos testing
max_queue_size=10000 # Backpressure simulation
)
)
# Pytest fixture example
@pytest.fixture
async def message_bus():
async with MessageBus(broker="memory") as bus:
yield bus
async def test_publish_subscribe(message_bus):
received = []
await message_bus.subscribe("test.topic", lambda msg: received.append(msg))
await message_bus.publish("test.topic", {"value": 42})
await asyncio.sleep(0.1) # Allow message delivery
assert received == [{"value": 42}]
Publish/Subscribe Pattern
The pub/sub pattern allows agents to broadcast messages to multiple subscribers without knowing who (or how many) are listening.
from agent_os.amb import MessageBus, Message
async with MessageBus(broker="redis") as bus:
# Simple publish
await bus.publish("agents.notifications", {
"type": "task_completed",
"agent_id": "agent-001",
"task_id": "task-123"
})
# Publish with metadata
await bus.publish(
topic="agents.events",
message={"event": "started"},
headers={
"correlation_id": "req-456",
"timestamp": "2024-01-15T10:30:00Z",
"priority": "high"
}
)
# Subscribe with async handler
async def handle_notification(msg: Message):
print(f"Received: {msg.data}")
print(f"Topic: {msg.topic}")
print(f"Headers: {msg.headers}")
print(f"Timestamp: {msg.timestamp}")
await bus.subscribe("agents.notifications", handle_notification)
# Subscribe with decorator pattern
@bus.subscriber("agents.events")
async def on_event(msg: Message):
print(f"Event: {msg.data['event']}")
# Multiple handlers for same topic
@bus.subscriber("agents.notifications")
async def log_notification(msg: Message):
logger.info(f"Notification: {msg.data}")
@bus.subscriber("agents.notifications")
async def store_notification(msg: Message):
await database.insert("notifications", msg.data)
# Keep running to receive messages
await bus.run_forever()
Message Filtering
from agent_os.amb import MessageBus, MessageFilter
async with MessageBus(broker="redis") as bus:
# Filter messages by content
high_priority_filter = MessageFilter(
headers={"priority": "high"}
)
@bus.subscriber("agents.tasks", filter=high_priority_filter)
async def handle_high_priority(msg: Message):
print(f"Urgent task: {msg.data}")
# Filter by message data
error_filter = MessageFilter(
data_match={"status": "error"}
)
@bus.subscriber("agents.status", filter=error_filter)
async def handle_errors(msg: Message):
await alert_team(msg.data)
# Custom filter function
def is_my_agent(msg: Message) -> bool:
return msg.data.get("agent_id") == "agent-001"
@bus.subscriber("agents.commands", filter=is_my_agent)
async def handle_my_commands(msg: Message):
await execute_command(msg.data)
Request/Reply Pattern
The request/reply pattern provides synchronous-style communication over the message bus, useful for querying other agents or services.
from agent_os.amb import MessageBus, Request, Response
async with MessageBus(broker="redis") as bus:
# === Requester Side ===
# Simple request with timeout
response = await bus.request(
topic="agents.query",
message={"query": "What is the current status?"},
timeout=5.0 # seconds
)
print(f"Response: {response.data}")
# Request with correlation ID tracking
response = await bus.request(
topic="agents.calculate",
message={"operation": "sum", "values": [1, 2, 3]},
timeout=10.0,
correlation_id="calc-001"
)
# === Responder Side ===
# Handle requests and send replies
@bus.responder("agents.query")
async def handle_query(request: Request) -> dict:
# Return value is automatically sent as reply
return {"status": "active", "uptime": 3600}
# Explicit reply control
@bus.responder("agents.calculate")
async def handle_calculation(request: Request, reply: Response):
try:
values = request.data["values"]
result = sum(values)
await reply.send({"result": result})
except Exception as e:
await reply.error(str(e))
# Multiple responders with load balancing
@bus.responder("agents.process", group="workers")
async def worker_1(request: Request) -> dict:
return await process_request(request.data)
# Another instance (could be different process/machine)
@bus.responder("agents.process", group="workers")
async def worker_2(request: Request) -> dict:
return await process_request(request.data)
Request Patterns
from agent_os.amb import MessageBus
import asyncio
async with MessageBus(broker="redis") as bus:
# Scatter-gather: request from multiple responders
responses = await bus.scatter(
topic="agents.vote",
message={"proposal": "increase_budget"},
timeout=5.0,
min_responses=3,
max_responses=10
)
votes = [r.data["vote"] for r in responses]
approved = votes.count("yes") > votes.count("no")
# First-response-wins (for redundancy)
response = await bus.request_first(
topics=[
"agents.primary.query",
"agents.backup.query"
],
message={"query": "status"},
timeout=5.0
)
# Parallel requests to different services
async with asyncio.TaskGroup() as tg:
status_task = tg.create_task(
bus.request("service.status", {}, timeout=5.0)
)
config_task = tg.create_task(
bus.request("service.config", {}, timeout=5.0)
)
status = status_task.result()
config = config_task.result()
Topic Wildcards
AMB supports wildcard patterns for flexible topic subscriptions:
*— Matches exactly one word in a topic segment#— Matches zero or more words (must be at the end)
from agent_os.amb import MessageBus
async with MessageBus(broker="redis") as bus:
# Single-level wildcard (*)
# Matches: agents.agent-001.status, agents.agent-002.status
# Does NOT match: agents.team.agent-001.status
@bus.subscriber("agents.*.status")
async def on_any_agent_status(msg):
agent_id = msg.topic.split(".")[1]
print(f"Agent {agent_id} status: {msg.data}")
# Multi-level wildcard (#)
# Matches: agents.events, agents.team.events, agents.team.sub.events
@bus.subscriber("agents.#")
async def on_all_agent_messages(msg):
print(f"Agent message on {msg.topic}: {msg.data}")
# Combined patterns
# Matches: tasks.agent-001.completed, tasks.agent-002.completed
@bus.subscriber("tasks.*.completed")
async def on_task_completed(msg):
print(f"Task completed: {msg.data}")
# Matches: logs.error, logs.agent.error, logs.agent.task.error
@bus.subscriber("logs.#.error")
async def on_any_error(msg):
await alert_team(msg.data)
# Publishing to specific topics
await bus.publish("agents.agent-001.status", {"state": "active"})
await bus.publish("agents.agent-002.status", {"state": "idle"})
await bus.publish("tasks.agent-001.completed", {"task_id": "123"})
await bus.publish("logs.agent.task.error", {"message": "Failed"})
Topic Naming Conventions
| Pattern | Example | Use Case |
|---|---|---|
domain.entity.action |
agents.task.created | Standard event naming |
domain.id.event |
agents.agent-001.heartbeat | Entity-specific events |
service.version.method |
api.v1.query | Versioned APIs |
env.region.service |
prod.us-east.agents | Environment routing |
Message Serialization
AMB supports multiple serialization formats for different performance and compatibility needs:
| Format | Size | Speed | Human Readable | Schema |
|---|---|---|---|---|
| JSON | Large | Fast | Yes | Optional |
| MessagePack | Small | Very Fast | No | No |
| Protobuf | Smallest | Fastest | No | Required |
JSON Serialization (Default)
from agent_os.amb import MessageBus, JsonSerializer
# Default JSON serialization
bus = MessageBus(serializer="json")
# Custom JSON configuration
bus = MessageBus(
serializer=JsonSerializer(
indent=None, # Compact output
ensure_ascii=False, # Allow unicode
date_format="iso", # ISO 8601 dates
decimal_as_string=True # Preserve decimal precision
)
)
# Publish any JSON-serializable data
await bus.publish("events", {
"timestamp": datetime.now(), # Automatically converted
"data": {"nested": [1, 2, 3]},
"agent_id": "agent-001"
})
MessagePack Serialization
from agent_os.amb import MessageBus, MsgPackSerializer
# Use MessagePack for smaller messages and faster serialization
bus = MessageBus(serializer="msgpack")
# Custom MessagePack configuration
bus = MessageBus(
serializer=MsgPackSerializer(
use_bin_type=True, # Binary vs raw strings
strict_map_key=False, # Allow non-string keys
datetime=True # Native datetime support
)
)
# Especially useful for binary data
await bus.publish("data.binary", {
"image": image_bytes, # Efficient binary encoding
"metadata": {"size": 1024}
})
Protobuf Serialization
from agent_os.amb import MessageBus, ProtobufSerializer
# Define your schema (agent_events.proto)
"""
syntax = "proto3";
message AgentEvent {
string agent_id = 1;
string event_type = 2;
int64 timestamp = 3;
map<string, string> metadata = 4;
}
"""
# Use Protobuf for maximum efficiency
bus = MessageBus(
serializer=ProtobufSerializer(
proto_path="./schemas",
default_message_type="AgentEvent"
)
)
# Publish with strong typing
from schemas.agent_events_pb2 import AgentEvent
event = AgentEvent(
agent_id="agent-001",
event_type="task_completed",
timestamp=int(time.time())
)
event.metadata["task_id"] = "task-123"
await bus.publish("agents.events", event)
# Subscribe with type hints
@bus.subscriber("agents.events")
async def handle_event(msg: AgentEvent):
print(f"Agent {msg.agent_id}: {msg.event_type}")
Error Handling and Retries
AMB provides robust error handling with configurable retry policies:
from agent_os.amb import MessageBus, RetryPolicy, DeadLetterQueue
# Configure retry policy
retry_policy = RetryPolicy(
max_retries=3,
initial_delay=1.0, # seconds
max_delay=60.0, # seconds
exponential_base=2, # delay doubles each retry
jitter=0.1, # 10% random jitter
retryable_exceptions=[ # Only retry these
ConnectionError,
TimeoutError
]
)
# Configure dead letter queue for failed messages
dlq = DeadLetterQueue(
topic="failed.messages",
max_retries=3,
retention_days=7
)
bus = MessageBus(
broker="redis",
retry_policy=retry_policy,
dead_letter_queue=dlq
)
# Handler-level retry configuration
@bus.subscriber("agents.tasks", retries=5, retry_delay=2.0)
async def process_task(msg):
# Automatic retry on failure
await risky_operation(msg.data)
# Manual error handling
@bus.subscriber("agents.critical")
async def handle_critical(msg):
try:
await process_critical(msg.data)
except ValidationError as e:
# Don't retry validation errors
msg.reject(requeue=False)
except TemporaryError as e:
# Retry later
msg.reject(requeue=True, delay=30)
except FatalError as e:
# Send to dead letter queue
await msg.dead_letter(reason=str(e))
Circuit Breaker Pattern
from agent_os.amb import MessageBus, CircuitBreaker
# Circuit breaker prevents cascading failures
circuit_breaker = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
recovery_timeout=30, # Try again after 30s
half_open_requests=3 # Test with 3 requests
)
bus = MessageBus(
broker="redis",
circuit_breaker=circuit_breaker
)
# Monitor circuit breaker state
@bus.on("circuit_breaker.opened")
async def on_circuit_open(topic: str):
logger.warning(f"Circuit breaker opened for {topic}")
await alert_ops_team(f"Message delivery failing to {topic}")
@bus.on("circuit_breaker.closed")
async def on_circuit_closed(topic: str):
logger.info(f"Circuit breaker closed for {topic}")
# Per-topic circuit breakers
@bus.subscriber(
"external.api",
circuit_breaker=CircuitBreaker(failure_threshold=3)
)
async def call_external_api(msg):
await external_api.call(msg.data)
Acknowledgment Modes
from agent_os.amb import MessageBus, AckMode
# Auto-acknowledge after handler completes
@bus.subscriber("tasks", ack_mode=AckMode.AUTO)
async def auto_ack(msg):
await process(msg.data)
# Automatically acknowledged on success
# Automatically rejected on exception
# Manual acknowledgment for complex workflows
@bus.subscriber("tasks", ack_mode=AckMode.MANUAL)
async def manual_ack(msg):
try:
result = await process(msg.data)
await save_result(result)
await msg.ack() # Explicit acknowledgment
except Exception as e:
await msg.nack(requeue=True) # Requeue for retry
# Batch acknowledgment for high throughput
@bus.subscriber("events", ack_mode=AckMode.BATCH, batch_size=100)
async def batch_ack(messages: list):
await process_batch([m.data for m in messages])
# All messages acknowledged together
Integration with KernelSpace
AMB integrates seamlessly with the Agent OS KernelSpace for policy-enforced messaging:
from agent_os import KernelSpace, Policy
from agent_os.amb import MessageBus
# Create message bus
bus = MessageBus(broker="redis")
# Define messaging policy
messaging_policy = Policy(
name="agent-messaging",
rules=[
{
"topic": "agents.*.commands",
"allow": ["admin", "supervisor"],
"deny": ["guest"]
},
{
"topic": "agents.*.status",
"allow": ["*"], # Everyone can publish status
"rate_limit": "100/minute"
},
{
"topic": "system.#",
"allow": ["system"],
"audit": True
}
]
)
# Initialize kernel with message bus
kernel = KernelSpace(
policies=[messaging_policy],
message_bus=bus
)
# Register agent with messaging capabilities
@kernel.agent("worker-agent")
class WorkerAgent:
async def run(self, kernel):
# Subscribe through kernel (policy enforced)
@kernel.subscribe("agents.worker.commands")
async def on_command(msg):
await self.execute_command(msg.data)
# Publish through kernel (policy enforced)
await kernel.publish("agents.worker.status", {
"state": "ready",
"capabilities": ["task-execution"]
})
# Cross-agent communication
@kernel.agent("supervisor-agent")
class SupervisorAgent:
async def assign_task(self, agent_id: str, task: dict):
# Request/reply through kernel
response = await kernel.request(
f"agents.{agent_id}.commands",
{"type": "execute_task", "task": task},
timeout=30.0
)
return response.data
Event-Driven Policies
from agent_os import KernelSpace, Policy
from agent_os.amb import MessageBus
kernel = KernelSpace()
# React to messages with policy enforcement
@kernel.on_message("agents.*.error")
async def handle_agent_error(msg, context):
# Automatic audit logging
await context.audit_log({
"event": "agent_error",
"agent_id": msg.topic.split(".")[1],
"error": msg.data
})
# Policy-based escalation
if msg.data.get("severity") == "critical":
await kernel.publish("alerts.critical", {
"source": msg.topic,
"error": msg.data
})
# Message transformation pipeline
@kernel.message_pipeline("external.api.response")
async def transform_api_response(msg, context):
# Transform before delivery to subscribers
return {
**msg.data,
"received_at": datetime.now().isoformat(),
"processed_by": context.kernel_id
}
# Message validation
@kernel.message_validator("agents.*.commands")
async def validate_command(msg, context):
schema = CommandSchema()
errors = schema.validate(msg.data)
if errors:
raise ValidationError(errors)
return True
Distributed Agent Coordination
from agent_os import KernelSpace
from agent_os.amb import MessageBus
kernel = KernelSpace(message_bus=MessageBus(broker="redis"))
# Leader election via message bus
@kernel.agent("coordinator")
class CoordinatorAgent:
async def run(self, kernel):
# Attempt to become leader
is_leader = await kernel.elect_leader(
topic="cluster.leader",
ttl=30 # Leadership expires after 30s
)
if is_leader:
await self.run_as_leader(kernel)
else:
await self.run_as_follower(kernel)
async def run_as_leader(self, kernel):
# Broadcast work to followers
@kernel.schedule(interval=5)
async def distribute_work():
tasks = await self.get_pending_tasks()
for task in tasks:
await kernel.publish(
"cluster.tasks",
task,
headers={"assigned_by": "leader"}
)
async def run_as_follower(self, kernel):
@kernel.subscribe("cluster.tasks")
async def on_task(msg):
await self.execute_task(msg.data)
await kernel.publish("cluster.results", {
"task_id": msg.data["id"],
"result": "completed"
})
Next Steps
- API Reference — Complete AMB API documentation
- EMK Module — Store message history in episodic memory
- IATP Module — Secure inter-agent communication
- Integrations — Use AMB with LangChain, CrewAI