AMB

Agent Message Bus — Decoupled agent communication via pub/sub and request/reply

What is AMB?

AMB (Agent Message Bus) is a messaging infrastructure that enables decoupled communication between AI agents. Instead of agents calling each other directly, they communicate through a message bus, allowing for loose coupling, scalability, and fault tolerance in multi-agent systems.

Why Decoupled Communication?

Direct agent-to-agent calls create tight coupling and single points of failure. With AMB, agents can be added, removed, or scaled independently. Publishers don't need to know about subscribers, enabling truly modular agent architectures.

Key Features

  • Publish/Subscribe — Broadcast messages to multiple interested agents
  • Request/Reply — Synchronous-style communication over async messaging
  • Multiple Backends — Redis, Kafka, NATS, SQS, or in-memory for testing
  • Topic Wildcards — Flexible subscription patterns with * and # wildcards
  • Message Serialization — JSON, MessagePack, or Protobuf encoding
  • Automatic Retries — Configurable retry policies with exponential backoff

Installation

Install AMB as a standalone module or as part of the full Agent OS kernel:

# Install via the kernel with AMB extras
pip install agent-os-kernel[amb]

# Install with specific broker support
pip install agent-os-kernel[amb,redis]    # Redis backend
pip install agent-os-kernel[amb,kafka]    # Kafka backend
pip install agent-os-kernel[amb,nats]     # NATS backend
pip install agent-os-kernel[amb,sqs]      # AWS SQS backend

# Install with all broker backends
pip install agent-os-kernel[amb,all-brokers]

# Install with specific serialization support
pip install agent-os-kernel[amb,msgpack]     # MessagePack serialization
pip install agent-os-kernel[amb,protobuf]    # Protobuf serialization

Environment Variables

Configure broker connections via environment variables:

# .env file
AMB_BROKER=redis                    # redis, kafka, nats, sqs, memory
AMB_REDIS_URL=redis://localhost:6379
AMB_KAFKA_BROKERS=localhost:9092
AMB_NATS_URL=nats://localhost:4222
AMB_SQS_REGION=us-east-1
AMB_SERIALIZER=json                 # json, msgpack, protobuf

MessageBus Class Basics

The MessageBus is the primary interface for all messaging operations:

from agent_os.amb import MessageBus

# Create a message bus with default settings (in-memory for development)
bus = MessageBus()

# Create with explicit broker configuration
bus = MessageBus(
    broker="redis",
    broker_url="redis://localhost:6379",
    serializer="json"
)

# Async context manager for proper resource cleanup
async with MessageBus(broker="redis") as bus:
    await bus.publish("topic.name", {"data": "value"})

# Manual lifecycle management
bus = MessageBus(broker="redis")
await bus.connect()
# ... use the bus ...
await bus.disconnect()

Core Methods

Method Description
publish(topic, message) Publish a message to a topic
subscribe(topic, handler) Subscribe to messages on a topic
unsubscribe(topic) Unsubscribe from a topic
request(topic, message, timeout) Send a request and wait for reply
reply(request_id, response) Send a reply to a request

Broker Backends

AMB supports multiple message broker backends, each with different characteristics:

Backend Best For Persistence Throughput Latency
Redis General purpose, real-time Optional (Streams) High Very Low
Kafka Event sourcing, audit logs Yes (durable) Very High Medium
NATS Lightweight, edge computing Optional (JetStream) Very High Ultra Low
SQS AWS-native, serverless Yes Medium Medium
InMemory Testing, development No Unlimited Zero

Redis Backend

from agent_os.amb import MessageBus, RedisBroker

# Basic Redis configuration
bus = MessageBus(
    broker="redis",
    broker_url="redis://localhost:6379/0"
)

# Advanced Redis configuration with Streams for persistence
bus = MessageBus(
    broker=RedisBroker(
        url="redis://localhost:6379",
        db=0,
        use_streams=True,           # Use Redis Streams for persistence
        stream_max_len=10000,       # Max messages per stream
        consumer_group="my-agents", # Consumer group for load balancing
        pool_size=10,               # Connection pool size
        ssl=True,                   # Enable SSL/TLS
        password="secret"
    )
)

# Redis Cluster support
bus = MessageBus(
    broker=RedisBroker(
        cluster_nodes=[
            "redis://node1:6379",
            "redis://node2:6379",
            "redis://node3:6379"
        ]
    )
)

Kafka Backend

from agent_os.amb import MessageBus, KafkaBroker

# Basic Kafka configuration
bus = MessageBus(
    broker="kafka",
    broker_url="localhost:9092"
)

# Advanced Kafka configuration
bus = MessageBus(
    broker=KafkaBroker(
        bootstrap_servers=["kafka1:9092", "kafka2:9092", "kafka3:9092"],
        client_id="agent-os-producer",
        group_id="agent-os-consumers",
        auto_offset_reset="earliest",  # "earliest" or "latest"
        enable_auto_commit=True,
        max_poll_records=500,
        session_timeout_ms=30000,
        # Security
        security_protocol="SASL_SSL",
        sasl_mechanism="PLAIN",
        sasl_username="user",
        sasl_password="password"
    )
)

# Kafka with Schema Registry (Avro/Protobuf)
bus = MessageBus(
    broker=KafkaBroker(
        bootstrap_servers=["kafka:9092"],
        schema_registry_url="http://schema-registry:8081"
    ),
    serializer="avro"
)

NATS Backend

from agent_os.amb import MessageBus, NatsBroker

# Basic NATS configuration
bus = MessageBus(
    broker="nats",
    broker_url="nats://localhost:4222"
)

# Advanced NATS configuration with JetStream persistence
bus = MessageBus(
    broker=NatsBroker(
        servers=["nats://nats1:4222", "nats://nats2:4222"],
        name="agent-os-client",
        # JetStream for persistence
        jetstream=True,
        stream_name="AGENTS",
        stream_subjects=["agents.>"],
        stream_retention="limits",     # "limits", "interest", or "workqueue"
        stream_max_msgs=1000000,
        # Security
        user="user",
        password="password",
        # Or use token
        token="secret-token",
        # Or use NKey
        nkeys_seed="SUACSSL3..."
    )
)

# NATS with TLS
bus = MessageBus(
    broker=NatsBroker(
        servers=["tls://nats:4222"],
        tls_cert="/path/to/cert.pem",
        tls_key="/path/to/key.pem",
        tls_ca="/path/to/ca.pem"
    )
)

AWS SQS Backend

from agent_os.amb import MessageBus, SQSBroker

# Basic SQS configuration (uses AWS credentials from environment)
bus = MessageBus(
    broker="sqs",
    broker_url="https://sqs.us-east-1.amazonaws.com"
)

# Advanced SQS configuration
bus = MessageBus(
    broker=SQSBroker(
        region_name="us-east-1",
        queue_prefix="agent-os-",      # Prefix for auto-created queues
        visibility_timeout=30,          # Seconds before message reappears
        wait_time_seconds=20,           # Long polling duration
        max_number_of_messages=10,      # Messages per receive call
        # FIFO queue support
        use_fifo=True,
        content_based_deduplication=True,
        # Dead letter queue
        dlq_arn="arn:aws:sqs:us-east-1:123456789:agent-os-dlq",
        max_receive_count=3,
        # Explicit credentials (prefer IAM roles instead)
        aws_access_key_id="AKIA...",
        aws_secret_access_key="secret"
    )
)

# SQS with SNS for fan-out (pub/sub pattern)
bus = MessageBus(
    broker=SQSBroker(
        region_name="us-east-1",
        use_sns_fanout=True,
        sns_topic_prefix="agent-os-"
    )
)

InMemory Backend (Testing)

from agent_os.amb import MessageBus, InMemoryBroker
import pytest

# Default: automatically uses InMemory in test environments
bus = MessageBus()  # Detects pytest and uses InMemory

# Explicit InMemory for testing
bus = MessageBus(broker="memory")

# InMemory with simulated latency for realistic testing
bus = MessageBus(
    broker=InMemoryBroker(
        latency_ms=10,              # Simulate network latency
        failure_rate=0.01,          # 1% random failures for chaos testing
        max_queue_size=10000        # Backpressure simulation
    )
)

# Pytest fixture example
@pytest.fixture
async def message_bus():
    async with MessageBus(broker="memory") as bus:
        yield bus

async def test_publish_subscribe(message_bus):
    received = []
    
    await message_bus.subscribe("test.topic", lambda msg: received.append(msg))
    await message_bus.publish("test.topic", {"value": 42})
    
    await asyncio.sleep(0.1)  # Allow message delivery
    assert received == [{"value": 42}]

Publish/Subscribe Pattern

The pub/sub pattern allows agents to broadcast messages to multiple subscribers without knowing who (or how many) are listening.

from agent_os.amb import MessageBus, Message

async with MessageBus(broker="redis") as bus:
    # Simple publish
    await bus.publish("agents.notifications", {
        "type": "task_completed",
        "agent_id": "agent-001",
        "task_id": "task-123"
    })
    
    # Publish with metadata
    await bus.publish(
        topic="agents.events",
        message={"event": "started"},
        headers={
            "correlation_id": "req-456",
            "timestamp": "2024-01-15T10:30:00Z",
            "priority": "high"
        }
    )
    
    # Subscribe with async handler
    async def handle_notification(msg: Message):
        print(f"Received: {msg.data}")
        print(f"Topic: {msg.topic}")
        print(f"Headers: {msg.headers}")
        print(f"Timestamp: {msg.timestamp}")
    
    await bus.subscribe("agents.notifications", handle_notification)
    
    # Subscribe with decorator pattern
    @bus.subscriber("agents.events")
    async def on_event(msg: Message):
        print(f"Event: {msg.data['event']}")
    
    # Multiple handlers for same topic
    @bus.subscriber("agents.notifications")
    async def log_notification(msg: Message):
        logger.info(f"Notification: {msg.data}")
    
    @bus.subscriber("agents.notifications")
    async def store_notification(msg: Message):
        await database.insert("notifications", msg.data)
    
    # Keep running to receive messages
    await bus.run_forever()

Message Filtering

from agent_os.amb import MessageBus, MessageFilter

async with MessageBus(broker="redis") as bus:
    # Filter messages by content
    high_priority_filter = MessageFilter(
        headers={"priority": "high"}
    )
    
    @bus.subscriber("agents.tasks", filter=high_priority_filter)
    async def handle_high_priority(msg: Message):
        print(f"Urgent task: {msg.data}")
    
    # Filter by message data
    error_filter = MessageFilter(
        data_match={"status": "error"}
    )
    
    @bus.subscriber("agents.status", filter=error_filter)
    async def handle_errors(msg: Message):
        await alert_team(msg.data)
    
    # Custom filter function
    def is_my_agent(msg: Message) -> bool:
        return msg.data.get("agent_id") == "agent-001"
    
    @bus.subscriber("agents.commands", filter=is_my_agent)
    async def handle_my_commands(msg: Message):
        await execute_command(msg.data)

Request/Reply Pattern

The request/reply pattern provides synchronous-style communication over the message bus, useful for querying other agents or services.

from agent_os.amb import MessageBus, Request, Response

async with MessageBus(broker="redis") as bus:
    # === Requester Side ===
    
    # Simple request with timeout
    response = await bus.request(
        topic="agents.query",
        message={"query": "What is the current status?"},
        timeout=5.0  # seconds
    )
    print(f"Response: {response.data}")
    
    # Request with correlation ID tracking
    response = await bus.request(
        topic="agents.calculate",
        message={"operation": "sum", "values": [1, 2, 3]},
        timeout=10.0,
        correlation_id="calc-001"
    )
    
    # === Responder Side ===
    
    # Handle requests and send replies
    @bus.responder("agents.query")
    async def handle_query(request: Request) -> dict:
        # Return value is automatically sent as reply
        return {"status": "active", "uptime": 3600}
    
    # Explicit reply control
    @bus.responder("agents.calculate")
    async def handle_calculation(request: Request, reply: Response):
        try:
            values = request.data["values"]
            result = sum(values)
            await reply.send({"result": result})
        except Exception as e:
            await reply.error(str(e))
    
    # Multiple responders with load balancing
    @bus.responder("agents.process", group="workers")
    async def worker_1(request: Request) -> dict:
        return await process_request(request.data)
    
    # Another instance (could be different process/machine)
    @bus.responder("agents.process", group="workers")
    async def worker_2(request: Request) -> dict:
        return await process_request(request.data)

Request Patterns

from agent_os.amb import MessageBus
import asyncio

async with MessageBus(broker="redis") as bus:
    # Scatter-gather: request from multiple responders
    responses = await bus.scatter(
        topic="agents.vote",
        message={"proposal": "increase_budget"},
        timeout=5.0,
        min_responses=3,
        max_responses=10
    )
    
    votes = [r.data["vote"] for r in responses]
    approved = votes.count("yes") > votes.count("no")
    
    # First-response-wins (for redundancy)
    response = await bus.request_first(
        topics=[
            "agents.primary.query",
            "agents.backup.query"
        ],
        message={"query": "status"},
        timeout=5.0
    )
    
    # Parallel requests to different services
    async with asyncio.TaskGroup() as tg:
        status_task = tg.create_task(
            bus.request("service.status", {}, timeout=5.0)
        )
        config_task = tg.create_task(
            bus.request("service.config", {}, timeout=5.0)
        )
    
    status = status_task.result()
    config = config_task.result()

Topic Wildcards

AMB supports wildcard patterns for flexible topic subscriptions:

  • * — Matches exactly one word in a topic segment
  • # — Matches zero or more words (must be at the end)
from agent_os.amb import MessageBus

async with MessageBus(broker="redis") as bus:
    # Single-level wildcard (*)
    # Matches: agents.agent-001.status, agents.agent-002.status
    # Does NOT match: agents.team.agent-001.status
    @bus.subscriber("agents.*.status")
    async def on_any_agent_status(msg):
        agent_id = msg.topic.split(".")[1]
        print(f"Agent {agent_id} status: {msg.data}")
    
    # Multi-level wildcard (#)
    # Matches: agents.events, agents.team.events, agents.team.sub.events
    @bus.subscriber("agents.#")
    async def on_all_agent_messages(msg):
        print(f"Agent message on {msg.topic}: {msg.data}")
    
    # Combined patterns
    # Matches: tasks.agent-001.completed, tasks.agent-002.completed
    @bus.subscriber("tasks.*.completed")
    async def on_task_completed(msg):
        print(f"Task completed: {msg.data}")
    
    # Matches: logs.error, logs.agent.error, logs.agent.task.error
    @bus.subscriber("logs.#.error")
    async def on_any_error(msg):
        await alert_team(msg.data)
    
    # Publishing to specific topics
    await bus.publish("agents.agent-001.status", {"state": "active"})
    await bus.publish("agents.agent-002.status", {"state": "idle"})
    await bus.publish("tasks.agent-001.completed", {"task_id": "123"})
    await bus.publish("logs.agent.task.error", {"message": "Failed"})

Topic Naming Conventions

Pattern Example Use Case
domain.entity.action agents.task.created Standard event naming
domain.id.event agents.agent-001.heartbeat Entity-specific events
service.version.method api.v1.query Versioned APIs
env.region.service prod.us-east.agents Environment routing

Message Serialization

AMB supports multiple serialization formats for different performance and compatibility needs:

Format Size Speed Human Readable Schema
JSON Large Fast Yes Optional
MessagePack Small Very Fast No No
Protobuf Smallest Fastest No Required

JSON Serialization (Default)

from agent_os.amb import MessageBus, JsonSerializer

# Default JSON serialization
bus = MessageBus(serializer="json")

# Custom JSON configuration
bus = MessageBus(
    serializer=JsonSerializer(
        indent=None,           # Compact output
        ensure_ascii=False,    # Allow unicode
        date_format="iso",     # ISO 8601 dates
        decimal_as_string=True # Preserve decimal precision
    )
)

# Publish any JSON-serializable data
await bus.publish("events", {
    "timestamp": datetime.now(),  # Automatically converted
    "data": {"nested": [1, 2, 3]},
    "agent_id": "agent-001"
})

MessagePack Serialization

from agent_os.amb import MessageBus, MsgPackSerializer

# Use MessagePack for smaller messages and faster serialization
bus = MessageBus(serializer="msgpack")

# Custom MessagePack configuration
bus = MessageBus(
    serializer=MsgPackSerializer(
        use_bin_type=True,     # Binary vs raw strings
        strict_map_key=False,  # Allow non-string keys
        datetime=True          # Native datetime support
    )
)

# Especially useful for binary data
await bus.publish("data.binary", {
    "image": image_bytes,      # Efficient binary encoding
    "metadata": {"size": 1024}
})

Protobuf Serialization

from agent_os.amb import MessageBus, ProtobufSerializer

# Define your schema (agent_events.proto)
"""
syntax = "proto3";

message AgentEvent {
    string agent_id = 1;
    string event_type = 2;
    int64 timestamp = 3;
    map<string, string> metadata = 4;
}
"""

# Use Protobuf for maximum efficiency
bus = MessageBus(
    serializer=ProtobufSerializer(
        proto_path="./schemas",
        default_message_type="AgentEvent"
    )
)

# Publish with strong typing
from schemas.agent_events_pb2 import AgentEvent

event = AgentEvent(
    agent_id="agent-001",
    event_type="task_completed",
    timestamp=int(time.time())
)
event.metadata["task_id"] = "task-123"

await bus.publish("agents.events", event)

# Subscribe with type hints
@bus.subscriber("agents.events")
async def handle_event(msg: AgentEvent):
    print(f"Agent {msg.agent_id}: {msg.event_type}")

Error Handling and Retries

AMB provides robust error handling with configurable retry policies:

from agent_os.amb import MessageBus, RetryPolicy, DeadLetterQueue

# Configure retry policy
retry_policy = RetryPolicy(
    max_retries=3,
    initial_delay=1.0,         # seconds
    max_delay=60.0,            # seconds
    exponential_base=2,        # delay doubles each retry
    jitter=0.1,                # 10% random jitter
    retryable_exceptions=[     # Only retry these
        ConnectionError,
        TimeoutError
    ]
)

# Configure dead letter queue for failed messages
dlq = DeadLetterQueue(
    topic="failed.messages",
    max_retries=3,
    retention_days=7
)

bus = MessageBus(
    broker="redis",
    retry_policy=retry_policy,
    dead_letter_queue=dlq
)

# Handler-level retry configuration
@bus.subscriber("agents.tasks", retries=5, retry_delay=2.0)
async def process_task(msg):
    # Automatic retry on failure
    await risky_operation(msg.data)

# Manual error handling
@bus.subscriber("agents.critical")
async def handle_critical(msg):
    try:
        await process_critical(msg.data)
    except ValidationError as e:
        # Don't retry validation errors
        msg.reject(requeue=False)
    except TemporaryError as e:
        # Retry later
        msg.reject(requeue=True, delay=30)
    except FatalError as e:
        # Send to dead letter queue
        await msg.dead_letter(reason=str(e))

Circuit Breaker Pattern

from agent_os.amb import MessageBus, CircuitBreaker

# Circuit breaker prevents cascading failures
circuit_breaker = CircuitBreaker(
    failure_threshold=5,       # Open after 5 failures
    recovery_timeout=30,       # Try again after 30s
    half_open_requests=3       # Test with 3 requests
)

bus = MessageBus(
    broker="redis",
    circuit_breaker=circuit_breaker
)

# Monitor circuit breaker state
@bus.on("circuit_breaker.opened")
async def on_circuit_open(topic: str):
    logger.warning(f"Circuit breaker opened for {topic}")
    await alert_ops_team(f"Message delivery failing to {topic}")

@bus.on("circuit_breaker.closed")
async def on_circuit_closed(topic: str):
    logger.info(f"Circuit breaker closed for {topic}")

# Per-topic circuit breakers
@bus.subscriber(
    "external.api",
    circuit_breaker=CircuitBreaker(failure_threshold=3)
)
async def call_external_api(msg):
    await external_api.call(msg.data)

Acknowledgment Modes

from agent_os.amb import MessageBus, AckMode

# Auto-acknowledge after handler completes
@bus.subscriber("tasks", ack_mode=AckMode.AUTO)
async def auto_ack(msg):
    await process(msg.data)
    # Automatically acknowledged on success
    # Automatically rejected on exception

# Manual acknowledgment for complex workflows
@bus.subscriber("tasks", ack_mode=AckMode.MANUAL)
async def manual_ack(msg):
    try:
        result = await process(msg.data)
        await save_result(result)
        await msg.ack()  # Explicit acknowledgment
    except Exception as e:
        await msg.nack(requeue=True)  # Requeue for retry

# Batch acknowledgment for high throughput
@bus.subscriber("events", ack_mode=AckMode.BATCH, batch_size=100)
async def batch_ack(messages: list):
    await process_batch([m.data for m in messages])
    # All messages acknowledged together

Integration with KernelSpace

AMB integrates seamlessly with the Agent OS KernelSpace for policy-enforced messaging:

from agent_os import KernelSpace, Policy
from agent_os.amb import MessageBus

# Create message bus
bus = MessageBus(broker="redis")

# Define messaging policy
messaging_policy = Policy(
    name="agent-messaging",
    rules=[
        {
            "topic": "agents.*.commands",
            "allow": ["admin", "supervisor"],
            "deny": ["guest"]
        },
        {
            "topic": "agents.*.status",
            "allow": ["*"],  # Everyone can publish status
            "rate_limit": "100/minute"
        },
        {
            "topic": "system.#",
            "allow": ["system"],
            "audit": True
        }
    ]
)

# Initialize kernel with message bus
kernel = KernelSpace(
    policies=[messaging_policy],
    message_bus=bus
)

# Register agent with messaging capabilities
@kernel.agent("worker-agent")
class WorkerAgent:
    async def run(self, kernel):
        # Subscribe through kernel (policy enforced)
        @kernel.subscribe("agents.worker.commands")
        async def on_command(msg):
            await self.execute_command(msg.data)
        
        # Publish through kernel (policy enforced)
        await kernel.publish("agents.worker.status", {
            "state": "ready",
            "capabilities": ["task-execution"]
        })

# Cross-agent communication
@kernel.agent("supervisor-agent")
class SupervisorAgent:
    async def assign_task(self, agent_id: str, task: dict):
        # Request/reply through kernel
        response = await kernel.request(
            f"agents.{agent_id}.commands",
            {"type": "execute_task", "task": task},
            timeout=30.0
        )
        return response.data

Event-Driven Policies

from agent_os import KernelSpace, Policy
from agent_os.amb import MessageBus

kernel = KernelSpace()

# React to messages with policy enforcement
@kernel.on_message("agents.*.error")
async def handle_agent_error(msg, context):
    # Automatic audit logging
    await context.audit_log({
        "event": "agent_error",
        "agent_id": msg.topic.split(".")[1],
        "error": msg.data
    })
    
    # Policy-based escalation
    if msg.data.get("severity") == "critical":
        await kernel.publish("alerts.critical", {
            "source": msg.topic,
            "error": msg.data
        })

# Message transformation pipeline
@kernel.message_pipeline("external.api.response")
async def transform_api_response(msg, context):
    # Transform before delivery to subscribers
    return {
        **msg.data,
        "received_at": datetime.now().isoformat(),
        "processed_by": context.kernel_id
    }

# Message validation
@kernel.message_validator("agents.*.commands")
async def validate_command(msg, context):
    schema = CommandSchema()
    errors = schema.validate(msg.data)
    if errors:
        raise ValidationError(errors)
    return True

Distributed Agent Coordination

from agent_os import KernelSpace
from agent_os.amb import MessageBus

kernel = KernelSpace(message_bus=MessageBus(broker="redis"))

# Leader election via message bus
@kernel.agent("coordinator")
class CoordinatorAgent:
    async def run(self, kernel):
        # Attempt to become leader
        is_leader = await kernel.elect_leader(
            topic="cluster.leader",
            ttl=30  # Leadership expires after 30s
        )
        
        if is_leader:
            await self.run_as_leader(kernel)
        else:
            await self.run_as_follower(kernel)
    
    async def run_as_leader(self, kernel):
        # Broadcast work to followers
        @kernel.schedule(interval=5)
        async def distribute_work():
            tasks = await self.get_pending_tasks()
            for task in tasks:
                await kernel.publish(
                    "cluster.tasks",
                    task,
                    headers={"assigned_by": "leader"}
                )
    
    async def run_as_follower(self, kernel):
        @kernel.subscribe("cluster.tasks")
        async def on_task(msg):
            await self.execute_task(msg.data)
            await kernel.publish("cluster.results", {
                "task_id": msg.data["id"],
                "result": "completed"
            })

Next Steps