RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Custom Agent Frameworks
  6. /Ch. 24
Custom Agent Frameworks

24. Agent Framework Project

Chapter 24 of 24 · 25 min
KEY INSIGHT

This project demonstrates that agent frameworks don't have to be complex. The value comes from clear communication patterns, observability, and error handling—not from elaborate abstractions.

This final chapter synthesizes everything into a complete project. You'll build a multi-agent system for automated code review.

Project Overview

The system contains three agents:

  1. Parser Agent - Extracts code structure from input
  2. Reviewer Agent - Identifies issues in parsed code
  3. Reporter Agent - Formats findings into a report

Implementation

# main.py
import asyncio
from dataclasses import dataclass
from typing import Any
from chapter_13 import RequestResponseProtocol
from chapter_14 import InMemoryMessageBus
from chapter_15 import AgentRegistry
from chapter_16 import AgentMetrics, StructuredFormatter
from chapter_17 import AgentError, ErrorSeverity, retry_with_backoff
import logging

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("code-review")
logger.setLevel(logging.INFO)

@dataclass
class CodeReviewRequest:
    file_path: str
    code: str
    language: str

@dataclass  
class CodeReviewReport:
    file_path: str
    issues: list[dict]
    summary: dict

class ParserAgent:
    def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
        self.agent_id = agent_id
        self.bus = bus
        self.registry = registry
        self.metrics = AgentMetrics(agent_id)
    
    async def process(self, request: CodeReviewRequest) -> dict:
        import time
        start = time.time()
        
        logger.info(f"Parsing {request.file_path}")
        
        # Simple parsing: count lines, functions, etc.
        lines = request.code.split('\n')
        
        result = {
            "file_path": request.file_path,
            "language": request.language,
            "lines_of_code": len([l for l in lines if l.strip() and not l.strip().startswith('#')]),
            "total_lines": len(lines),
            "functions": len([l for l in lines if 'def ' in l]),
            "classes": len([l for l in lines if 'class ' in l]),
        }
        
        duration_ms = (time.time() - start) * 1000
        self.metrics.record_request(duration_ms, success=True)
        
        logger.info(f"Parsed {request.file_path}: {result['lines_of_code']} LOC")
        return result

class ReviewerAgent:
    def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
        self.agent_id = agent_id
        self.bus = bus
        self.registry = registry
        self.metrics = AgentMetrics(agent_id)
    
    async def process(self, parsed: dict) -> list[dict]:
        import time
        start = time.time()
        
        logger.info(f"Reviewing {parsed['file_path']}")
        
        issues = []
        
        # Rule: Too many lines
        if parsed['lines_of_code'] > 300:
            issues.append({
                "severity": "warning",
                "rule": "too-many-lines",
                "message": f"File has {parsed['lines_of_code']} LOC (threshold: 300)",
                "line": None
            })
        
        # Rule: Missing functions
        if parsed['functions'] == 0:
            issues.append({
                "severity": "info",
                "rule": "no-functions",
                "message": "File contains no function definitions",
                "line": None
            })
        
        duration_ms = (time.time() - start) * 1000
        self.metrics.record_request(duration_ms, success=True)
        
        logger.info(f"Review complete: {len(issues)} issues found")
        return issues

class ReporterAgent:
    def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
        self.agent_id = agent_id
        self.bus = bus
        self.registry = registry
        self.metrics = AgentMetrics(agent_id)
    
    async def process(self, parsed: dict, issues: list[dict]) -> CodeReviewReport:
        import time
        start = time.time()
        
        logger.info(f"Generating report for {parsed['file_path']}")
        
        severity_counts = {"error": 0, "warning": 0, "info": 0}
        for issue in issues:
            severity = issue.get("severity", "info")
            if severity in severity_counts:
                severity_counts[severity] += 1
        
        report = CodeReviewReport(
            file_path=parsed['file_path'],
            issues=issues,
            summary={
                "total_issues": len(issues),
                "by_severity": severity_counts,
                "lines_analyzed": parsed['lines_of_code']
            }
        )
        
        duration_ms = (time.time() - start) * 1000
        self.metrics.record_request(duration_ms, success=True)
        
        logger.info(f"Report generated: {report.summary}")
        return report

class CodeReviewPipeline:
    def __init__(self):
        self.bus = InMemoryMessageBus()
        self.registry = AgentRegistry()
        
        self.parser = ParserAgent("parser", self.bus, self.registry)
        self.reviewer = ReviewerAgent("reviewer", self.bus, self.registry)
        self.reporter = ReporterAgent("reporter", self.bus, self.registry)
        
        # Register with discovery
        asyncio.create_task(self.registry.register("parser", ["parsing"], "in-process"))
        asyncio.create_task(self.registry.register("reviewer", ["review"], "in-process"))
        asyncio.create_task(self.registry.register("reporter", ["reporting"], "in-process"))
    
    async def review(self, code: str, file_path: str = "unknown", language: str = "python") -> CodeReviewReport:
        request = CodeReviewRequest(file_path=file_path, code=code, language=language)
        
        # Step 1: Parse
        parsed = await self.parser.process(request)
        
        # Step 2: Review  
        issues = await self.reviewer.process(parsed)
        
        # Step 3: Report
        report = await self.reporter.process(parsed, issues)
        
        return report

async def main():
    pipeline = CodeReviewPipeline()
    
    sample_code = '''
def hello():
    print("Hello, world!")

def calculate(x, y):
    return x + y

class MyClass:
    def method(self):
        pass
'''
    
    report = await pipeline.review(sample_code, "example.py")
    
    print(f"\n=== Code Review Report: {report.file_path} ===")
    print(f"Lines analyzed: {report.summary['lines_analyzed']}")
    print(f"Total issues: {report.summary['total_issues']}")
    
    for severity, count in report.summary['by_severity'].items():
        if count > 0:
            print(f"  {severity}: {count}")
    
    print("\nDetails:")
    for issue in report.issues:
        print(f"  [{issue['severity']}] {issue['rule']}: {issue['message']}")

if __name__ == "__main__":
    asyncio.run(main())

Adding Observability

# Add to main() to enable observability
async def log_metrics(pipeline: CodeReviewPipeline):
    """Periodic metrics logging."""
    while True:
        await asyncio.sleep(10)
        for agent in [pipeline.parser, pipeline.reviewer, pipeline.reporter]:
            logger.info(
                f"Metrics for {agent.agent_id}",
                extra={
                    "agent_id": agent.agent_id,
                    "requests": agent.metrics.requests_received,
                    "error_rate": agent.metrics.error_rate,
                    "avg_latency_ms": agent.metrics.avg_latency_ms
                }
            )

# Run metrics logging in background
metrics_task = asyncio.create_task(log_metrics(pipeline))

Testing

# test_integration.py
import pytest

@pytest.mark.asyncio
async def test_full_pipeline():
    pipeline = CodeReviewPipeline()
    
    report = await pipeline.review("def test(): pass", "test.py")
    
    assert report.file_path == "test.py"
    assert report.summary['lines_analyzed'] == 1  # One non-empty line

@pytest.mark.asyncio
async def test_metrics_tracked():
    pipeline = CodeReviewPipeline()
    
    await pipeline.review("x = 1", "short.py")
    
    assert pipeline.parser.metrics.requests_received >= 1
    assert pipeline.parser.metrics.error_rate == 0.0

Running the Project

# Install dependencies
pip install pytest pytest-asyncio

# Run the code review
python main.py

# Run tests
pytest test_integration.py -v
EXERCISE

Extend the code review system with a new agent (e.g., security scanner, style checker). Add it to the pipeline, wire up observability, and write tests. Verify that existing functionality still works. This completes the course. The framework is yours to extend. Start simple, instrument everything, and remember: the best agent system is one that fails visibly and recovers gracefully.

← Chapter 23
Custom Framework
Course complete →
Browse all courses