07. Episodic Memory
Episodic memory stores the agent's experience—the sequence of actions, observations, and outcomes from past interactions. Unlike working memory, episodic memory persists across sessions and informs future behavior.
Episodic memory is stored as events in a timeline. Each event has a timestamp, event type, and payload.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class EpisodeEvent:
timestamp: datetime
event_type: str # "user_message", "tool_call", "tool_result", "agent_response"
payload: dict[str, Any]
session_id: str
@dataclass
class EpisodicMemory:
def __init__(self, storage: EpisodeStorage):
self.storage = storage
def record(self, event_type: str, payload: dict[str, Any], session_id: str) -> None:
event = EpisodeEvent(
timestamp=datetime.utcnow(),
event_type=event_type,
payload=payload,
session_id=session_id
)
self.storage.save(event)
def get_session(self, session_id: str) -> list[EpisodeEvent]:
return self.storage.get_by_session(session_id)
def get_recent(self, limit: int = 10) -> list[EpisodeEvent]:
return self.storage.get_recent(limit)
def search(self, query: str, limit: int = 5) -> list[EpisodeEvent]:
"""Full-text search across event payloads."""
return self.storage.search(query, limit)
Storage abstraction:
from abc import ABC, abstractmethod
class EpisodeStorage(ABC):
@abstractmethod
def save(self, event: EpisodeEvent) -> None:
pass
@abstractmethod
def get_by_session(self, session_id: str) -> list[EpisodeEvent]:
pass
@abstractmethod
def get_recent(self, limit: int) -> list[EpisodeEvent]:
pass
@abstractmethod
def search(self, query: str, limit: int) -> list[EpisodeEvent]:
pass
In-memory implementation for testing:
class InMemoryEpisodeStorage(EpisodeStorage):
def __init__(self):
self.events: list[EpisodeEvent] = []
def save(self, event: EpisodeEvent) -> None:
self.events.append(event)
def get_by_session(self, session_id: str) -> list[EpisodeEvent]:
return [e for e in self.events if e.session_id == session_id]
def get_recent(self, limit: int) -> list[EpisodeEvent]:
return sorted(self.events, key=lambda e: e.timestamp, reverse=True)[:limit]
def search(self, query: str, limit: int) -> list[EpisodeEvent]:
query_lower = query.lower()
matches = [e for e in self.events if query_lower in str(e.payload).lower()]
return matches[:limit]
Failure mode: storage growth. In production, episodic memory grows indefinitely. Implement retention policies—keep events from the last N days, or compress old events into summaries.
Implement a retention policy that keeps all events from the last 7 days, summaries of the previous 30 days (one event per day), and discards older data. Use the storage abstraction to implement it cleanly.