🧠 EMK
Episodic Memory Kernel — Immutable agent memory with semantic search
What is EMK?
The Episodic Memory Kernel (EMK) provides immutable, auditable memory storage for AI agents. Every action, decision, and outcome is recorded as an Episode—a structured, tamper-proof record that enables compliance, debugging, and reproducibility.
EMK solves critical challenges in AI agent development:
- Audit Trails — Complete history of agent decisions for compliance
- Debugging — Replay any moment in agent history to diagnose issues
- Learning — Semantic search enables agents to learn from past experiences
- Anti-Patterns — Negative memory prevents repeating known mistakes
from agent_os.emk import Memory, Episode
# Create memory for an agent
memory = Memory(agent_id="customer-service-agent")
# Store an episode
episode = Episode(
goal="Resolve customer complaint #1234",
action="Issued full refund of $50",
result="Customer satisfied, case closed",
reflection="Proactive refunds improve satisfaction scores"
)
await memory.store(episode)
# Later: semantic search for similar situations
similar = await memory.search("customer refund complaint")
Installation
Install EMK standalone or as part of the full Agent OS kernel:
# Standalone EMK (minimal dependencies)
pip install agent-os-kernel[emk]
# Or with specific backend support
pip install agent-os-kernel[emk,sqlite] # SQLite backend
pip install agent-os-kernel[emk,postgres] # PostgreSQL backend
# Full kernel (includes all modules)
pip install agent-os-kernel
Requirements
- Python 3.9+
- Optional: sentence-transformers for semantic search
- Optional: Database drivers for SQLite/PostgreSQL backends
Episode Structure
An Episode is the fundamental unit of memory in EMK. It captures the
complete context of an agent action following the Goal-Action-Result-Reflection pattern.
from agent_os.emk import Episode
from datetime import datetime
class Episode:
"""Immutable record of an agent action."""
id: str # Unique identifier (auto-generated UUID)
agent_id: str # ID of the agent that created this episode
timestamp: datetime # When the episode was created
# Core GARR fields
goal: str # What the agent was trying to achieve
action: str # What the agent actually did
result: str # What happened (outcome)
reflection: str # Agent's analysis of what was learned
# Metadata
metadata: dict # Custom key-value pairs
embedding: list # Vector embedding for semantic search
parent_id: str # Link to parent episode (for chains)
tags: list[str] # Categorization tags
# Immutability
hash: str # SHA-256 hash for tamper detection
signature: str # Optional cryptographic signature
Creating Episodes
from agent_os.emk import Episode
# Basic episode
episode = Episode(
goal="Answer user question about pricing",
action="Retrieved pricing from database, formatted response",
result="User received accurate pricing information",
reflection="Database query was efficient, response was well-received"
)
# Episode with metadata
episode = Episode(
goal="Process order #5678",
action="Validated payment, created shipment, sent confirmation",
result="Order successfully processed",
reflection="Payment validation could be parallelized",
metadata={
"order_id": "5678",
"amount": 149.99,
"customer_tier": "premium"
},
tags=["order-processing", "premium-customer"]
)
# Chained episodes (parent-child relationship)
parent = Episode(
goal="Complete multi-step workflow",
action="Initiated workflow",
result="Workflow started"
)
child = Episode(
goal="Execute step 1 of workflow",
action="Performed data validation",
result="Data validated successfully",
parent_id=parent.id
)
MemoryStore with FileAdapter
MemoryStore is the primary interface for storing and retrieving episodes.
It uses pluggable adapters for different storage backends.
from agent_os.emk import Memory, MemoryStore, FileAdapter
# Simple initialization (uses file storage by default)
memory = Memory(agent_id="my-agent")
# Explicit adapter configuration
adapter = FileAdapter(
base_path="./agent_memory",
compression=True, # Compress stored episodes
max_file_size_mb=100 # Split into multiple files
)
memory = MemoryStore(
agent_id="my-agent",
adapter=adapter
)
# Store episodes
await memory.store(episode)
# Store with custom ID
await memory.store(episode, id="custom-id-123")
# Batch storage (more efficient)
episodes = [episode1, episode2, episode3]
await memory.store_batch(episodes)
# Retrieve by ID
episode = await memory.get("episode-id")
# Get recent episodes
recent = await memory.get_recent(limit=10)
# Get episodes by time range
from datetime import datetime, timedelta
yesterday = datetime.now() - timedelta(days=1)
episodes = await memory.get_range(start=yesterday, end=datetime.now())
# Count episodes
total = await memory.count()
tagged = await memory.count(tags=["error"])
FileAdapter Options
| Option | Type | Default | Description |
|---|---|---|---|
base_path |
str | "./memory" | Directory for episode storage |
compression |
bool | True | Enable gzip compression |
max_file_size_mb |
int | 100 | Max size before file rotation |
index_embeddings |
bool | True | Build vector index for search |
Semantic Search
EMK provides powerful semantic search capabilities using vector embeddings. Find relevant past experiences even when exact keywords don't match.
from agent_os.emk import Memory, SearchOptions
memory = Memory(
agent_id="support-agent",
embedding_model="all-MiniLM-L6-v2" # Default model
)
# Basic semantic search
results = await memory.search("customer asking for refund")
# Search with options
results = await memory.search(
query="payment processing error",
options=SearchOptions(
limit=10, # Max results
min_score=0.7, # Minimum similarity threshold
tags=["payment", "error"], # Filter by tags
time_range=(start, end), # Filter by time
include_metadata=True # Include full metadata
)
)
# Access results
for result in results:
print(f"Score: {result.score}")
print(f"Episode: {result.episode}")
print(f"Similarity: {result.similarity}")
# Search specific fields only
results = await memory.search(
query="database timeout",
fields=["action", "result"] # Only search these fields
)
# Hybrid search (semantic + keyword)
results = await memory.hybrid_search(
semantic_query="user authentication issue",
keyword_filter="OAuth",
weights={"semantic": 0.7, "keyword": 0.3}
)
Custom Embedding Models
from agent_os.emk import Memory, EmbeddingProvider
# Use OpenAI embeddings
memory = Memory(
agent_id="my-agent",
embedding_provider=EmbeddingProvider.OPENAI,
embedding_model="text-embedding-3-small"
)
# Use custom embedding function
def custom_embedder(text: str) -> list[float]:
# Your custom embedding logic
return model.encode(text).tolist()
memory = Memory(
agent_id="my-agent",
embedding_fn=custom_embedder
)
# Disable embeddings (keyword search only)
memory = Memory(
agent_id="my-agent",
enable_embeddings=False
)
Time-Travel Debugging
The replay() function enables time-travel debugging—reconstruct
the exact state of an agent at any point in history.
from agent_os.emk import Memory, replay, ReplayOptions
from datetime import datetime
memory = Memory(agent_id="trading-agent")
# Replay to a specific timestamp
state = await replay(
memory=memory,
timestamp=datetime(2024, 1, 15, 14, 30, 0)
)
print(f"Episodes at that moment: {len(state.episodes)}")
print(f"Agent context: {state.context}")
print(f"Active goals: {state.active_goals}")
# Replay with step-by-step execution
async for step in replay(memory, timestamp, step_by_step=True):
print(f"Step {step.index}: {step.episode.action}")
print(f"State after: {step.state}")
# Optionally pause and inspect
if step.episode.tags and "error" in step.episode.tags:
print(f"Error occurred: {step.episode.result}")
break
# Replay a specific episode chain
chain = await memory.get_chain(episode_id="root-episode-id")
for episode in chain:
print(f"{episode.timestamp}: {episode.action}")
# Compare two points in time
diff = await memory.diff(
timestamp_a=datetime(2024, 1, 15, 10, 0),
timestamp_b=datetime(2024, 1, 15, 14, 0)
)
print(f"Episodes added: {len(diff.added)}")
print(f"State changes: {diff.state_delta}")
Replay Options
from agent_os.emk import ReplayOptions
options = ReplayOptions(
include_metadata=True, # Include episode metadata
reconstruct_context=True, # Rebuild agent context
validate_hashes=True, # Verify episode integrity
stop_on_error=False, # Continue past errors
max_episodes=1000 # Limit for performance
)
state = await replay(memory, timestamp, options=options)
# Verify replay integrity
if state.integrity_verified:
print("All episode hashes validated")
else:
print(f"Integrity issues: {state.integrity_errors}")
Memory Compression (Sleep Cycle)
Like biological memory consolidation during sleep, EMK provides a
compress() function that consolidates old memories,
reducing storage while preserving important patterns.
from agent_os.emk import Memory, compress, CompressionStrategy
memory = Memory(agent_id="long-running-agent")
# Basic compression (default strategy)
result = await compress(memory)
print(f"Compressed {result.episodes_processed} episodes")
print(f"Storage reduced: {result.storage_saved_mb} MB")
print(f"Summaries created: {len(result.summaries)}")
# Compression with custom strategy
result = await compress(
memory=memory,
strategy=CompressionStrategy.SEMANTIC_CLUSTERING,
options={
"min_age_days": 30, # Only compress episodes older than 30 days
"preserve_tags": ["error", "critical"], # Never compress these
"cluster_threshold": 0.85, # Similarity threshold for clustering
"summary_model": "gpt-4" # LLM for generating summaries
}
)
# View compression summaries
for summary in result.summaries:
print(f"Summary: {summary.content}")
print(f"Covers {summary.episode_count} episodes")
print(f"Time range: {summary.start} to {summary.end}")
# Schedule automatic compression (sleep cycle)
memory.enable_sleep_cycle(
schedule="0 3 * * *", # Run at 3 AM daily
strategy=CompressionStrategy.IMPORTANCE_WEIGHTED,
notify_on_complete=True
)
Compression Strategies
| Strategy | Description | Best For |
|---|---|---|
SEMANTIC_CLUSTERING |
Groups similar episodes into summaries | Repetitive tasks |
IMPORTANCE_WEIGHTED |
Preserves high-impact episodes, summarizes routine ones | Decision-heavy agents |
TIME_DECAY |
Progressively summarizes older memories | Long-running agents |
HIERARCHICAL |
Creates summary hierarchies (day → week → month) | Analytics and reporting |
Negative Memory (Anti-Patterns)
Negative memory records failures and anti-patterns—actions that should be avoided. This enables agents to learn from mistakes without repeating them.
from agent_os.emk import Memory, Episode, NegativeMemory
memory = Memory(agent_id="code-review-agent")
# Record a negative episode (mistake)
negative_episode = Episode(
goal="Deploy to production",
action="Deployed without running tests",
result="Production outage for 2 hours",
reflection="ALWAYS run tests before deployment",
tags=["negative", "deployment", "critical"]
)
await memory.store_negative(negative_episode)
# Check for anti-patterns before acting
async def before_action(proposed_action: str):
warnings = await memory.check_anti_patterns(proposed_action)
for warning in warnings:
print(f"⚠️ Warning: Similar action caused issues")
print(f" Past action: {warning.episode.action}")
print(f" Result: {warning.episode.result}")
print(f" Similarity: {warning.score}")
return len(warnings) == 0 # Safe to proceed?
# Query negative memories specifically
failures = await memory.search(
query="deployment failure",
memory_type="negative"
)
# Get anti-pattern summary
anti_patterns = await memory.get_anti_patterns(
category="deployment",
min_occurrences=2 # Patterns that happened at least twice
)
for pattern in anti_patterns:
print(f"Anti-pattern: {pattern.description}")
print(f"Occurrences: {pattern.count}")
print(f"Suggested avoidance: {pattern.recommendation}")
Automatic Anti-Pattern Detection
from agent_os.emk import Memory, AntiPatternDetector
memory = Memory(agent_id="my-agent")
# Enable automatic detection
detector = AntiPatternDetector(
memory=memory,
detection_threshold=0.8, # Similarity threshold
min_failures=2, # Minimum failures to flag
auto_tag=True # Automatically tag negative episodes
)
# The detector monitors new episodes
@detector.on_pattern_detected
async def handle_pattern(pattern):
print(f"New anti-pattern detected: {pattern.description}")
# Send alert, update policies, etc.
# Manual analysis
patterns = await detector.analyze(
time_range=(start_date, end_date),
categories=["api-calls", "data-processing"]
)
Storage Backends
EMK supports multiple storage backends through a unified adapter interface.
File Storage (Default)
from agent_os.emk import Memory, FileAdapter
adapter = FileAdapter(
base_path="./agent_memory",
format="jsonl", # or "parquet" for analytics
compression="gzip",
partition_by="day" # Organize files by day
)
memory = Memory(agent_id="my-agent", adapter=adapter)
SQLite
from agent_os.emk import Memory, SQLiteAdapter
adapter = SQLiteAdapter(
database="./agent_memory.db",
enable_fts=True, # Full-text search
enable_vector=True, # Vector similarity (sqlite-vss)
wal_mode=True # Write-ahead logging
)
memory = Memory(agent_id="my-agent", adapter=adapter)
# SQLite-specific: direct SQL queries
results = adapter.execute("""
SELECT * FROM episodes
WHERE json_extract(metadata, '$.severity') = 'critical'
ORDER BY timestamp DESC
LIMIT 10
""")
PostgreSQL
from agent_os.emk import Memory, PostgresAdapter
adapter = PostgresAdapter(
connection_string="postgresql://user:pass@localhost/agentdb",
schema="emk",
enable_pgvector=True, # pgvector for semantic search
pool_size=10
)
memory = Memory(agent_id="my-agent", adapter=adapter)
# PostgreSQL-specific: use pgvector operators
results = await adapter.vector_search(
embedding=query_embedding,
limit=10,
filter={"agent_id": "my-agent"}
)
Backend Comparison
| Backend | Best For | Semantic Search | Scalability |
|---|---|---|---|
FileAdapter |
Development, single agent | In-memory index | Limited |
SQLiteAdapter |
Local deployment, moderate scale | sqlite-vss | Medium |
PostgresAdapter |
Production, multi-agent | pgvector | High |
Integration with KernelSpace
EMK integrates seamlessly with KernelSpace for automatic memory capture and policy-based memory management.
from agent_os import KernelSpace
from agent_os.emk import Memory, MemoryPolicy
# Initialize kernel with EMK
kernel = KernelSpace(
policy="strict",
memory=Memory(agent_id="governed-agent")
)
# Automatic episode capture for all registered functions
@kernel.register
async def process_order(order_id: str):
# EMK automatically captures:
# - Goal: "Execute process_order"
# - Action: Function execution details
# - Result: Return value or exception
# - Reflection: Auto-generated based on outcome
result = await do_processing(order_id)
return result
# Memory-aware policies
memory_policy = MemoryPolicy(
retention_days=365, # Keep episodes for 1 year
auto_compress_after_days=30, # Compress after 30 days
required_fields=["goal", "result"],
max_episode_size_kb=100
)
kernel.add_policy(memory_policy)
# Access memory from kernel context
@kernel.register
async def smart_agent(task: str):
# Search past experiences
similar = await kernel.memory.search(task)
if similar:
# Use past experience to inform decision
best_approach = similar[0].episode.reflection
print(f"Learning from past: {best_approach}")
# Execute with enhanced context
return await execute_with_context(task, similar)
# Flight recorder integration
kernel.flight_recorder.on_episode(lambda ep:
print(f"Episode recorded: {ep.id}")
)
# Export memory for analysis
await kernel.memory.export(
format="parquet",
path="./memory_export.parquet",
time_range=(start, end)
)
Cross-Module Integration
from agent_os import KernelSpace
from agent_os.emk import Memory
from agent_os.cmvk import verify
from agent_os.iatp import Identity
kernel = KernelSpace()
memory = Memory(agent_id="secure-agent")
identity = Identity.create("secure-agent")
@kernel.register
async def verified_action(task: str):
# Check memory for anti-patterns
warnings = await memory.check_anti_patterns(task)
if warnings:
return {"blocked": True, "reason": "Anti-pattern detected"}
# Verify decision with CMVK
decision = await verify(
prompt=f"Should I: {task}",
models=["gpt-4", "claude-3"],
threshold=0.9
)
# Record episode with cryptographic signature
episode = Episode(
goal=task,
action=f"Decision: {decision.result}",
result=f"Consensus: {decision.confidence}",
reflection="Verified multi-model decision",
signature=identity.sign(decision.result)
)
await memory.store(episode)
return decision.result