23. Custom Framework
Chapter 23 of 24 · 20 min
When existing frameworks don't fit your domain, build a minimal custom one. This chapter walks through the architecture of a purpose-built agent framework.
Design Principles
The best custom frameworks have few principles:
- Explicit over implicit. No hidden magic.
- Composable by default. Agents are functions with inputs and outputs.
- Observable at every boundary. Nothing happens that isn't logged.
Core Abstractions
from typing import Any, AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
@dataclass
class AgentInput:
payload: Any
metadata: dict = field(default_factory=dict)
@dataclass
class AgentOutput:
payload: Any
success: bool
error: str | None = None
duration_ms: float = 0.0
class BaseAgent:
def __init__(self, name: str):
self.name = name
self._running = False
self._input_queue: asyncio.Queue[AgentInput] = asyncio.Queue()
async def process(self, input_data: AgentInput) -> AgentOutput:
"""Override this method with actual processing logic."""
raise NotImplementedError
async def run(self) -> AsyncIterator[AgentOutput]:
"""Main loop: process inputs and yield outputs."""
self._running = True
while self._running:
try:
input_data = await asyncio.wait_for(
self._input_queue.get(),
timeout=1.0
)
except asyncio.TimeoutError:
continue
start = asyncio.get_event_loop().time()
try:
result = await self.process(input_data)
result.duration_ms = (asyncio.get_event_loop().time() - start) * 1000
except Exception as e:
result = AgentOutput(
payload=None,
success=False,
error=str(e),
duration_ms=(asyncio.get_event_loop().time() - start) * 1000
)
yield result
async def submit(self, input_data: AgentInput):
await self._input_queue.put(input_data)
def stop(self):
self._running = False
Example Implementation
class TextAnalyzerAgent(BaseAgent):
async def process(self, input_data: AgentInput) -> AgentOutput:
text = input_data.payload
if not isinstance(text, str):
return AgentOutput(
payload=None,
success=False,
error="Expected string input"
)
words = text.split()
return AgentOutput(
payload={
"word_count": len(words),
"char_count": len(text),
"avg_word_length": sum(len(w) for w in words) / len(words) if words else 0
},
success=True
)
Framework Composition
class AgentPipeline:
def __init__(self, agents: list[BaseAgent]):
self.agents = agents
async def execute(self, initial_input: Any) -> dict:
current = initial_input
for agent in self.agents:
result = await agent.process(AgentInput(payload=current))
if not result.success:
return {"error": result.error, "failed_at": agent.name}
current = result.payload
return {"result": current}
EXERCISE
Build a custom framework for a specific use case (e.g., data transformation, content moderation, format conversion). Identify the minimum abstractions needed and implement them.