24. Agent Framework Project
Chapter 24 of 24 · 25 min
This final chapter synthesizes everything into a complete project. You'll build a multi-agent system for automated code review.
Project Overview
The system contains three agents:
- Parser Agent - Extracts code structure from input
- Reviewer Agent - Identifies issues in parsed code
- Reporter Agent - Formats findings into a report
Implementation
# main.py
import asyncio
from dataclasses import dataclass
from typing import Any
from chapter_13 import RequestResponseProtocol
from chapter_14 import InMemoryMessageBus
from chapter_15 import AgentRegistry
from chapter_16 import AgentMetrics, StructuredFormatter
from chapter_17 import AgentError, ErrorSeverity, retry_with_backoff
import logging
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("code-review")
logger.setLevel(logging.INFO)
@dataclass
class CodeReviewRequest:
file_path: str
code: str
language: str
@dataclass
class CodeReviewReport:
file_path: str
issues: list[dict]
summary: dict
class ParserAgent:
def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
self.agent_id = agent_id
self.bus = bus
self.registry = registry
self.metrics = AgentMetrics(agent_id)
async def process(self, request: CodeReviewRequest) -> dict:
import time
start = time.time()
logger.info(f"Parsing {request.file_path}")
# Simple parsing: count lines, functions, etc.
lines = request.code.split('\n')
result = {
"file_path": request.file_path,
"language": request.language,
"lines_of_code": len([l for l in lines if l.strip() and not l.strip().startswith('#')]),
"total_lines": len(lines),
"functions": len([l for l in lines if 'def ' in l]),
"classes": len([l for l in lines if 'class ' in l]),
}
duration_ms = (time.time() - start) * 1000
self.metrics.record_request(duration_ms, success=True)
logger.info(f"Parsed {request.file_path}: {result['lines_of_code']} LOC")
return result
class ReviewerAgent:
def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
self.agent_id = agent_id
self.bus = bus
self.registry = registry
self.metrics = AgentMetrics(agent_id)
async def process(self, parsed: dict) -> list[dict]:
import time
start = time.time()
logger.info(f"Reviewing {parsed['file_path']}")
issues = []
# Rule: Too many lines
if parsed['lines_of_code'] > 300:
issues.append({
"severity": "warning",
"rule": "too-many-lines",
"message": f"File has {parsed['lines_of_code']} LOC (threshold: 300)",
"line": None
})
# Rule: Missing functions
if parsed['functions'] == 0:
issues.append({
"severity": "info",
"rule": "no-functions",
"message": "File contains no function definitions",
"line": None
})
duration_ms = (time.time() - start) * 1000
self.metrics.record_request(duration_ms, success=True)
logger.info(f"Review complete: {len(issues)} issues found")
return issues
class ReporterAgent:
def __init__(self, agent_id: str, bus: InMemoryMessageBus, registry: AgentRegistry):
self.agent_id = agent_id
self.bus = bus
self.registry = registry
self.metrics = AgentMetrics(agent_id)
async def process(self, parsed: dict, issues: list[dict]) -> CodeReviewReport:
import time
start = time.time()
logger.info(f"Generating report for {parsed['file_path']}")
severity_counts = {"error": 0, "warning": 0, "info": 0}
for issue in issues:
severity = issue.get("severity", "info")
if severity in severity_counts:
severity_counts[severity] += 1
report = CodeReviewReport(
file_path=parsed['file_path'],
issues=issues,
summary={
"total_issues": len(issues),
"by_severity": severity_counts,
"lines_analyzed": parsed['lines_of_code']
}
)
duration_ms = (time.time() - start) * 1000
self.metrics.record_request(duration_ms, success=True)
logger.info(f"Report generated: {report.summary}")
return report
class CodeReviewPipeline:
def __init__(self):
self.bus = InMemoryMessageBus()
self.registry = AgentRegistry()
self.parser = ParserAgent("parser", self.bus, self.registry)
self.reviewer = ReviewerAgent("reviewer", self.bus, self.registry)
self.reporter = ReporterAgent("reporter", self.bus, self.registry)
# Register with discovery
asyncio.create_task(self.registry.register("parser", ["parsing"], "in-process"))
asyncio.create_task(self.registry.register("reviewer", ["review"], "in-process"))
asyncio.create_task(self.registry.register("reporter", ["reporting"], "in-process"))
async def review(self, code: str, file_path: str = "unknown", language: str = "python") -> CodeReviewReport:
request = CodeReviewRequest(file_path=file_path, code=code, language=language)
# Step 1: Parse
parsed = await self.parser.process(request)
# Step 2: Review
issues = await self.reviewer.process(parsed)
# Step 3: Report
report = await self.reporter.process(parsed, issues)
return report
async def main():
pipeline = CodeReviewPipeline()
sample_code = '''
def hello():
print("Hello, world!")
def calculate(x, y):
return x + y
class MyClass:
def method(self):
pass
'''
report = await pipeline.review(sample_code, "example.py")
print(f"\n=== Code Review Report: {report.file_path} ===")
print(f"Lines analyzed: {report.summary['lines_analyzed']}")
print(f"Total issues: {report.summary['total_issues']}")
for severity, count in report.summary['by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
print("\nDetails:")
for issue in report.issues:
print(f" [{issue['severity']}] {issue['rule']}: {issue['message']}")
if __name__ == "__main__":
asyncio.run(main())
Adding Observability
# Add to main() to enable observability
async def log_metrics(pipeline: CodeReviewPipeline):
"""Periodic metrics logging."""
while True:
await asyncio.sleep(10)
for agent in [pipeline.parser, pipeline.reviewer, pipeline.reporter]:
logger.info(
f"Metrics for {agent.agent_id}",
extra={
"agent_id": agent.agent_id,
"requests": agent.metrics.requests_received,
"error_rate": agent.metrics.error_rate,
"avg_latency_ms": agent.metrics.avg_latency_ms
}
)
# Run metrics logging in background
metrics_task = asyncio.create_task(log_metrics(pipeline))
Testing
# test_integration.py
import pytest
@pytest.mark.asyncio
async def test_full_pipeline():
pipeline = CodeReviewPipeline()
report = await pipeline.review("def test(): pass", "test.py")
assert report.file_path == "test.py"
assert report.summary['lines_analyzed'] == 1 # One non-empty line
@pytest.mark.asyncio
async def test_metrics_tracked():
pipeline = CodeReviewPipeline()
await pipeline.review("x = 1", "short.py")
assert pipeline.parser.metrics.requests_received >= 1
assert pipeline.parser.metrics.error_rate == 0.0
Running the Project
# Install dependencies
pip install pytest pytest-asyncio
# Run the code review
python main.py
# Run tests
pytest test_integration.py -v
EXERCISE
Extend the code review system with a new agent (e.g., security scanner, style checker). Add it to the pipeline, wire up observability, and write tests. Verify that existing functionality still works. This completes the course. The framework is yours to extend. Start simple, instrument everything, and remember: the best agent system is one that fails visibly and recovers gracefully.