Module • Memory • Auditable

🧠 EMK

Episodic Memory Kernel — Immutable agent memory with semantic search

What is EMK?

The Episodic Memory Kernel (EMK) provides immutable, auditable memory storage for AI agents. Every action, decision, and outcome is recorded as an Episode—a structured, tamper-proof record that enables compliance, debugging, and reproducibility.

EMK solves critical challenges in AI agent development:

  • Audit Trails — Complete history of agent decisions for compliance
  • Debugging — Replay any moment in agent history to diagnose issues
  • Learning — Semantic search enables agents to learn from past experiences
  • Anti-Patterns — Negative memory prevents repeating known mistakes
from agent_os.emk import Memory, Episode

# Create memory for an agent
memory = Memory(agent_id="customer-service-agent")

# Store an episode
episode = Episode(
    goal="Resolve customer complaint #1234",
    action="Issued full refund of $50",
    result="Customer satisfied, case closed",
    reflection="Proactive refunds improve satisfaction scores"
)

await memory.store(episode)

# Later: semantic search for similar situations
similar = await memory.search("customer refund complaint")

Installation

Install EMK standalone or as part of the full Agent OS kernel:

# Standalone EMK (minimal dependencies)
pip install agent-os-kernel[emk]

# Or with specific backend support
pip install agent-os-kernel[emk,sqlite]   # SQLite backend
pip install agent-os-kernel[emk,postgres] # PostgreSQL backend

# Full kernel (includes all modules)
pip install agent-os-kernel

Requirements

  • Python 3.9+
  • Optional: sentence-transformers for semantic search
  • Optional: Database drivers for SQLite/PostgreSQL backends

Episode Structure

An Episode is the fundamental unit of memory in EMK. It captures the complete context of an agent action following the Goal-Action-Result-Reflection pattern.

from agent_os.emk import Episode
from datetime import datetime

class Episode:
    """Immutable record of an agent action."""
    
    id: str              # Unique identifier (auto-generated UUID)
    agent_id: str        # ID of the agent that created this episode
    timestamp: datetime  # When the episode was created
    
    # Core GARR fields
    goal: str            # What the agent was trying to achieve
    action: str          # What the agent actually did
    result: str          # What happened (outcome)
    reflection: str      # Agent's analysis of what was learned
    
    # Metadata
    metadata: dict       # Custom key-value pairs
    embedding: list      # Vector embedding for semantic search
    parent_id: str       # Link to parent episode (for chains)
    tags: list[str]      # Categorization tags
    
    # Immutability
    hash: str            # SHA-256 hash for tamper detection
    signature: str       # Optional cryptographic signature

Creating Episodes

from agent_os.emk import Episode

# Basic episode
episode = Episode(
    goal="Answer user question about pricing",
    action="Retrieved pricing from database, formatted response",
    result="User received accurate pricing information",
    reflection="Database query was efficient, response was well-received"
)

# Episode with metadata
episode = Episode(
    goal="Process order #5678",
    action="Validated payment, created shipment, sent confirmation",
    result="Order successfully processed",
    reflection="Payment validation could be parallelized",
    metadata={
        "order_id": "5678",
        "amount": 149.99,
        "customer_tier": "premium"
    },
    tags=["order-processing", "premium-customer"]
)

# Chained episodes (parent-child relationship)
parent = Episode(
    goal="Complete multi-step workflow",
    action="Initiated workflow",
    result="Workflow started"
)

child = Episode(
    goal="Execute step 1 of workflow",
    action="Performed data validation",
    result="Data validated successfully",
    parent_id=parent.id
)

MemoryStore with FileAdapter

MemoryStore is the primary interface for storing and retrieving episodes. It uses pluggable adapters for different storage backends.

from agent_os.emk import Memory, MemoryStore, FileAdapter

# Simple initialization (uses file storage by default)
memory = Memory(agent_id="my-agent")

# Explicit adapter configuration
adapter = FileAdapter(
    base_path="./agent_memory",
    compression=True,          # Compress stored episodes
    max_file_size_mb=100       # Split into multiple files
)

memory = MemoryStore(
    agent_id="my-agent",
    adapter=adapter
)

# Store episodes
await memory.store(episode)

# Store with custom ID
await memory.store(episode, id="custom-id-123")

# Batch storage (more efficient)
episodes = [episode1, episode2, episode3]
await memory.store_batch(episodes)

# Retrieve by ID
episode = await memory.get("episode-id")

# Get recent episodes
recent = await memory.get_recent(limit=10)

# Get episodes by time range
from datetime import datetime, timedelta
yesterday = datetime.now() - timedelta(days=1)
episodes = await memory.get_range(start=yesterday, end=datetime.now())

# Count episodes
total = await memory.count()
tagged = await memory.count(tags=["error"])

FileAdapter Options

Option Type Default Description
base_path str "./memory" Directory for episode storage
compression bool True Enable gzip compression
max_file_size_mb int 100 Max size before file rotation
index_embeddings bool True Build vector index for search

Time-Travel Debugging

The replay() function enables time-travel debugging—reconstruct the exact state of an agent at any point in history.

from agent_os.emk import Memory, replay, ReplayOptions
from datetime import datetime

memory = Memory(agent_id="trading-agent")

# Replay to a specific timestamp
state = await replay(
    memory=memory,
    timestamp=datetime(2024, 1, 15, 14, 30, 0)
)

print(f"Episodes at that moment: {len(state.episodes)}")
print(f"Agent context: {state.context}")
print(f"Active goals: {state.active_goals}")

# Replay with step-by-step execution
async for step in replay(memory, timestamp, step_by_step=True):
    print(f"Step {step.index}: {step.episode.action}")
    print(f"State after: {step.state}")
    
    # Optionally pause and inspect
    if step.episode.tags and "error" in step.episode.tags:
        print(f"Error occurred: {step.episode.result}")
        break

# Replay a specific episode chain
chain = await memory.get_chain(episode_id="root-episode-id")
for episode in chain:
    print(f"{episode.timestamp}: {episode.action}")

# Compare two points in time
diff = await memory.diff(
    timestamp_a=datetime(2024, 1, 15, 10, 0),
    timestamp_b=datetime(2024, 1, 15, 14, 0)
)
print(f"Episodes added: {len(diff.added)}")
print(f"State changes: {diff.state_delta}")

Replay Options

from agent_os.emk import ReplayOptions

options = ReplayOptions(
    include_metadata=True,      # Include episode metadata
    reconstruct_context=True,   # Rebuild agent context
    validate_hashes=True,       # Verify episode integrity
    stop_on_error=False,        # Continue past errors
    max_episodes=1000           # Limit for performance
)

state = await replay(memory, timestamp, options=options)

# Verify replay integrity
if state.integrity_verified:
    print("All episode hashes validated")
else:
    print(f"Integrity issues: {state.integrity_errors}")

Memory Compression (Sleep Cycle)

Like biological memory consolidation during sleep, EMK provides a compress() function that consolidates old memories, reducing storage while preserving important patterns.

from agent_os.emk import Memory, compress, CompressionStrategy

memory = Memory(agent_id="long-running-agent")

# Basic compression (default strategy)
result = await compress(memory)
print(f"Compressed {result.episodes_processed} episodes")
print(f"Storage reduced: {result.storage_saved_mb} MB")
print(f"Summaries created: {len(result.summaries)}")

# Compression with custom strategy
result = await compress(
    memory=memory,
    strategy=CompressionStrategy.SEMANTIC_CLUSTERING,
    options={
        "min_age_days": 30,         # Only compress episodes older than 30 days
        "preserve_tags": ["error", "critical"],  # Never compress these
        "cluster_threshold": 0.85,  # Similarity threshold for clustering
        "summary_model": "gpt-4"    # LLM for generating summaries
    }
)

# View compression summaries
for summary in result.summaries:
    print(f"Summary: {summary.content}")
    print(f"Covers {summary.episode_count} episodes")
    print(f"Time range: {summary.start} to {summary.end}")

# Schedule automatic compression (sleep cycle)
memory.enable_sleep_cycle(
    schedule="0 3 * * *",  # Run at 3 AM daily
    strategy=CompressionStrategy.IMPORTANCE_WEIGHTED,
    notify_on_complete=True
)

Compression Strategies

Strategy Description Best For
SEMANTIC_CLUSTERING Groups similar episodes into summaries Repetitive tasks
IMPORTANCE_WEIGHTED Preserves high-impact episodes, summarizes routine ones Decision-heavy agents
TIME_DECAY Progressively summarizes older memories Long-running agents
HIERARCHICAL Creates summary hierarchies (day → week → month) Analytics and reporting

Negative Memory (Anti-Patterns)

Negative memory records failures and anti-patterns—actions that should be avoided. This enables agents to learn from mistakes without repeating them.

from agent_os.emk import Memory, Episode, NegativeMemory

memory = Memory(agent_id="code-review-agent")

# Record a negative episode (mistake)
negative_episode = Episode(
    goal="Deploy to production",
    action="Deployed without running tests",
    result="Production outage for 2 hours",
    reflection="ALWAYS run tests before deployment",
    tags=["negative", "deployment", "critical"]
)

await memory.store_negative(negative_episode)

# Check for anti-patterns before acting
async def before_action(proposed_action: str):
    warnings = await memory.check_anti_patterns(proposed_action)
    
    for warning in warnings:
        print(f"⚠️ Warning: Similar action caused issues")
        print(f"   Past action: {warning.episode.action}")
        print(f"   Result: {warning.episode.result}")
        print(f"   Similarity: {warning.score}")
    
    return len(warnings) == 0  # Safe to proceed?

# Query negative memories specifically
failures = await memory.search(
    query="deployment failure",
    memory_type="negative"
)

# Get anti-pattern summary
anti_patterns = await memory.get_anti_patterns(
    category="deployment",
    min_occurrences=2  # Patterns that happened at least twice
)

for pattern in anti_patterns:
    print(f"Anti-pattern: {pattern.description}")
    print(f"Occurrences: {pattern.count}")
    print(f"Suggested avoidance: {pattern.recommendation}")

Automatic Anti-Pattern Detection

from agent_os.emk import Memory, AntiPatternDetector

memory = Memory(agent_id="my-agent")

# Enable automatic detection
detector = AntiPatternDetector(
    memory=memory,
    detection_threshold=0.8,     # Similarity threshold
    min_failures=2,              # Minimum failures to flag
    auto_tag=True                # Automatically tag negative episodes
)

# The detector monitors new episodes
@detector.on_pattern_detected
async def handle_pattern(pattern):
    print(f"New anti-pattern detected: {pattern.description}")
    # Send alert, update policies, etc.

# Manual analysis
patterns = await detector.analyze(
    time_range=(start_date, end_date),
    categories=["api-calls", "data-processing"]
)

Storage Backends

EMK supports multiple storage backends through a unified adapter interface.

File Storage (Default)

from agent_os.emk import Memory, FileAdapter

adapter = FileAdapter(
    base_path="./agent_memory",
    format="jsonl",          # or "parquet" for analytics
    compression="gzip",
    partition_by="day"       # Organize files by day
)

memory = Memory(agent_id="my-agent", adapter=adapter)

SQLite

from agent_os.emk import Memory, SQLiteAdapter

adapter = SQLiteAdapter(
    database="./agent_memory.db",
    enable_fts=True,         # Full-text search
    enable_vector=True,      # Vector similarity (sqlite-vss)
    wal_mode=True            # Write-ahead logging
)

memory = Memory(agent_id="my-agent", adapter=adapter)

# SQLite-specific: direct SQL queries
results = adapter.execute("""
    SELECT * FROM episodes 
    WHERE json_extract(metadata, '$.severity') = 'critical'
    ORDER BY timestamp DESC
    LIMIT 10
""")

PostgreSQL

from agent_os.emk import Memory, PostgresAdapter

adapter = PostgresAdapter(
    connection_string="postgresql://user:pass@localhost/agentdb",
    schema="emk",
    enable_pgvector=True,    # pgvector for semantic search
    pool_size=10
)

memory = Memory(agent_id="my-agent", adapter=adapter)

# PostgreSQL-specific: use pgvector operators
results = await adapter.vector_search(
    embedding=query_embedding,
    limit=10,
    filter={"agent_id": "my-agent"}
)

Backend Comparison

Backend Best For Semantic Search Scalability
FileAdapter Development, single agent In-memory index Limited
SQLiteAdapter Local deployment, moderate scale sqlite-vss Medium
PostgresAdapter Production, multi-agent pgvector High

Integration with KernelSpace

EMK integrates seamlessly with KernelSpace for automatic memory capture and policy-based memory management.

from agent_os import KernelSpace
from agent_os.emk import Memory, MemoryPolicy

# Initialize kernel with EMK
kernel = KernelSpace(
    policy="strict",
    memory=Memory(agent_id="governed-agent")
)

# Automatic episode capture for all registered functions
@kernel.register
async def process_order(order_id: str):
    # EMK automatically captures:
    # - Goal: "Execute process_order"
    # - Action: Function execution details
    # - Result: Return value or exception
    # - Reflection: Auto-generated based on outcome
    
    result = await do_processing(order_id)
    return result

# Memory-aware policies
memory_policy = MemoryPolicy(
    retention_days=365,              # Keep episodes for 1 year
    auto_compress_after_days=30,     # Compress after 30 days
    required_fields=["goal", "result"],
    max_episode_size_kb=100
)

kernel.add_policy(memory_policy)

# Access memory from kernel context
@kernel.register
async def smart_agent(task: str):
    # Search past experiences
    similar = await kernel.memory.search(task)
    
    if similar:
        # Use past experience to inform decision
        best_approach = similar[0].episode.reflection
        print(f"Learning from past: {best_approach}")
    
    # Execute with enhanced context
    return await execute_with_context(task, similar)

# Flight recorder integration
kernel.flight_recorder.on_episode(lambda ep: 
    print(f"Episode recorded: {ep.id}")
)

# Export memory for analysis
await kernel.memory.export(
    format="parquet",
    path="./memory_export.parquet",
    time_range=(start, end)
)

Cross-Module Integration

from agent_os import KernelSpace
from agent_os.emk import Memory
from agent_os.cmvk import verify
from agent_os.iatp import Identity

kernel = KernelSpace()
memory = Memory(agent_id="secure-agent")
identity = Identity.create("secure-agent")

@kernel.register
async def verified_action(task: str):
    # Check memory for anti-patterns
    warnings = await memory.check_anti_patterns(task)
    if warnings:
        return {"blocked": True, "reason": "Anti-pattern detected"}
    
    # Verify decision with CMVK
    decision = await verify(
        prompt=f"Should I: {task}",
        models=["gpt-4", "claude-3"],
        threshold=0.9
    )
    
    # Record episode with cryptographic signature
    episode = Episode(
        goal=task,
        action=f"Decision: {decision.result}",
        result=f"Consensus: {decision.confidence}",
        reflection="Verified multi-model decision",
        signature=identity.sign(decision.result)
    )
    
    await memory.store(episode)
    return decision.result