07. Episodic Memory

Chapter 7 of 24 · 20 min

Episodic memory stores the agent's experience—the sequence of actions, observations, and outcomes from past interactions. Unlike working memory, episodic memory persists across sessions and informs future behavior.

Episodic memory is stored as events in a timeline. Each event has a timestamp, event type, and payload.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class EpisodeEvent:
    timestamp: datetime
    event_type: str  # "user_message", "tool_call", "tool_result", "agent_response"
    payload: dict[str, Any]
    session_id: str

@dataclass
class EpisodicMemory:
    def __init__(self, storage: EpisodeStorage):
        self.storage = storage
    
    def record(self, event_type: str, payload: dict[str, Any], session_id: str) -> None:
        event = EpisodeEvent(
            timestamp=datetime.utcnow(),
            event_type=event_type,
            payload=payload,
            session_id=session_id
        )
        self.storage.save(event)
    
    def get_session(self, session_id: str) -> list[EpisodeEvent]:
        return self.storage.get_by_session(session_id)
    
    def get_recent(self, limit: int = 10) -> list[EpisodeEvent]:
        return self.storage.get_recent(limit)
    
    def search(self, query: str, limit: int = 5) -> list[EpisodeEvent]:
        """Full-text search across event payloads."""
        return self.storage.search(query, limit)

Storage abstraction:

from abc import ABC, abstractmethod

class EpisodeStorage(ABC):
    @abstractmethod
    def save(self, event: EpisodeEvent) -> None:
        pass
    
    @abstractmethod
    def get_by_session(self, session_id: str) -> list[EpisodeEvent]:
        pass
    
    @abstractmethod
    def get_recent(self, limit: int) -> list[EpisodeEvent]:
        pass
    
    @abstractmethod
    def search(self, query: str, limit: int) -> list[EpisodeEvent]:
        pass

In-memory implementation for testing:

class InMemoryEpisodeStorage(EpisodeStorage):
    def __init__(self):
        self.events: list[EpisodeEvent] = []
    
    def save(self, event: EpisodeEvent) -> None:
        self.events.append(event)
    
    def get_by_session(self, session_id: str) -> list[EpisodeEvent]:
        return [e for e in self.events if e.session_id == session_id]
    
    def get_recent(self, limit: int) -> list[EpisodeEvent]:
        return sorted(self.events, key=lambda e: e.timestamp, reverse=True)[:limit]
    
    def search(self, query: str, limit: int) -> list[EpisodeEvent]:
        query_lower = query.lower()
        matches = [e for e in self.events if query_lower in str(e.payload).lower()]
        return matches[:limit]

Failure mode: storage growth. In production, episodic memory grows indefinitely. Implement retention policies—keep events from the last N days, or compress old events into summaries.

EXERCISE

Implement a retention policy that keeps all events from the last 7 days, summaries of the previous 30 days (one event per day), and discards older data. Use the storage abstraction to implement it cleanly.