23. Custom Framework

Chapter 23 of 24 · 20 min

When existing frameworks don't fit your domain, build a minimal custom one. This chapter walks through the architecture of a purpose-built agent framework.

Design Principles

The best custom frameworks have few principles:

  1. Explicit over implicit. No hidden magic.
  2. Composable by default. Agents are functions with inputs and outputs.
  3. Observable at every boundary. Nothing happens that isn't logged.

Core Abstractions

from typing import Any, AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime
import asyncio

@dataclass
class AgentInput:
    payload: Any
    metadata: dict = field(default_factory=dict)

@dataclass
class AgentOutput:
    payload: Any
    success: bool
    error: str | None = None
    duration_ms: float = 0.0

class BaseAgent:
    def __init__(self, name: str):
        self.name = name
        self._running = False
        self._input_queue: asyncio.Queue[AgentInput] = asyncio.Queue()
    
    async def process(self, input_data: AgentInput) -> AgentOutput:
        """Override this method with actual processing logic."""
        raise NotImplementedError
    
    async def run(self) -> AsyncIterator[AgentOutput]:
        """Main loop: process inputs and yield outputs."""
        self._running = True
        while self._running:
            try:
                input_data = await asyncio.wait_for(
                    self._input_queue.get(), 
                    timeout=1.0
                )
            except asyncio.TimeoutError:
                continue
            
            start = asyncio.get_event_loop().time()
            try:
                result = await self.process(input_data)
                result.duration_ms = (asyncio.get_event_loop().time() - start) * 1000
            except Exception as e:
                result = AgentOutput(
                    payload=None,
                    success=False,
                    error=str(e),
                    duration_ms=(asyncio.get_event_loop().time() - start) * 1000
                )
            
            yield result
    
    async def submit(self, input_data: AgentInput):
        await self._input_queue.put(input_data)
    
    def stop(self):
        self._running = False

Example Implementation

class TextAnalyzerAgent(BaseAgent):
    async def process(self, input_data: AgentInput) -> AgentOutput:
        text = input_data.payload
        
        if not isinstance(text, str):
            return AgentOutput(
                payload=None,
                success=False,
                error="Expected string input"
            )
        
        words = text.split()
        return AgentOutput(
            payload={
                "word_count": len(words),
                "char_count": len(text),
                "avg_word_length": sum(len(w) for w in words) / len(words) if words else 0
            },
            success=True
        )

Framework Composition

class AgentPipeline:
    def __init__(self, agents: list[BaseAgent]):
        self.agents = agents
    
    async def execute(self, initial_input: Any) -> dict:
        current = initial_input
        
        for agent in self.agents:
            result = await agent.process(AgentInput(payload=current))
            if not result.success:
                return {"error": result.error, "failed_at": agent.name}
            current = result.payload
        
        return {"result": current}
EXERCISE

Build a custom framework for a specific use case (e.g., data transformation, content moderation, format conversion). Identify the minimum abstractions needed and implement them.