18. Tool Ecosystem Project
Chapter 18 of 18 · 35 min
This chapter brings together everything learned in previous chapters. Build a complete function calling system with multiple tools, error recovery, rate limiting, monitoring, and tests.
Project Overview
Build a local document assistant with these capabilities:
- Read documents from a specified directory
- Search within documents
- Summarize document contents
- Answer questions about documents
Project Structure
document_assistant/
├── tools/
│ ├── __init__.py
│ ├── reader.py
│ ├── search.py
│ ├── summarizer.py
│ └── qa.py
├── executor/
│ ├── __init__.py
│ ├── dispatcher.py
│ ├── recovery.py
│ └── rate_limiter.py
├── monitoring/
│ ├── __init__.py
│ └── metrics.py
├── tests/
│ ├── test_tools.py
│ ├── test_executor.py
│ └── test_integration.py
├── app.py
├── config.py
└── requirements.txt
Core Tool Implementations
# tools/reader.py
import os
import hashlib
from pathlib import Path
from typing import Optional
class DocumentReader:
"""Read documents from allowed directory with security checks."""
def __init__(self, allowed_dir: str, max_size: int = 1024 * 1024):
self.allowed_dir = Path(allowed_dir).resolve()
self.max_size = max_size
def read(self, filepath: str) -> dict:
"""Read document content."""
# Security: prevent path traversal
target = (self.allowed_dir / filepath).resolve()
if not str(target).startswith(str(self.allowed_dir)):
raise ValueError("Access denied: path traversal attempt")
if not target.exists():
raise FileNotFoundError(f"File not found: {filepath}")
# Size check
size = target.stat().st_size
if size > self.max_size:
raise ValueError(f"File too large: {size} bytes (max {self.max_size})")
# Read content
with open(target, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return {
"filepath": str(target.relative_to(self.allowed_dir)),
"size": size,
"content": content[:self.max_size],
"hash": hashlib.sha256(content.encode()).hexdigest()[:16]
}
def list_documents(self, extension: Optional[str] = None) -> list[dict]:
"""List all documents in allowed directory."""
documents = []
pattern = f"*.{extension}" if extension else "*"
for path in self.allowed_dir.glob(pattern):
if path.is_file():
documents.append({
"name": path.name,
"size": path.stat().st_size,
"modified": path.stat().st_mtime
})
return documents
# tools/search.py
import re
from collections import defaultdict
from .reader import DocumentReader
class DocumentSearch:
"""Search within documents by keyword or pattern."""
def __init__(self, reader: DocumentReader):
self.reader = reader
self._index: dict[str, list[tuple[int, str]]] = {}
def index_documents(self, glob_pattern: str = "*.txt") -> dict:
"""Build search index from documents."""
documents = self.reader.list_documents()
self._index.clear()
for doc in documents:
try:
content = self.reader.read(doc["name"])["content"]
lines = content.split('\n')
self._index[doc["name"]] = [
(i + 1, line) for i, line in enumerate(lines) if line.strip()
]
except Exception as e:
continue
return {
"documents_indexed": len(self._index),
"total_lines": sum(len(v) for v in self._index.values())
}
def search(self, query: str, case_sensitive: bool = False, max_results: int = 10) -> list[dict]:
"""Search for query string in indexed documents."""
if case_sensitive:
pattern = re.compile(re.escape(query))
else:
pattern = re.compile(re.escape(query), re.IGNORECASE)
results = []
for doc_name, lines in self._index.items():
for line_num, line in lines:
if pattern.search(line):
results.append({
"document": doc_name,
"line": line_num,
"text": line.strip()
})
if len(results) >= max_results:
return results
return results
# tools/qa.py
from langchain_ollama import ChatOllama
class DocumentQA:
"""Answer questions about documents using local LLM."""
def __init__(self, model: str = "llama3.2", base_url: str = "http://localhost:11434"):
self.llm = ChatOllama(model=model, base_url=base_url, temperature=0.3)
def answer(self, question: str, context: str, max_context_length: int = 4000) -> dict:
"""Answer question based on document context."""
# Truncate context if too long
if len(context) > max_context_length:
context = context[:max_context_length] + "..."
prompt = f"""Based on the following document content, answer the question.
Document content:
{context}
Question: {question}
Answer:"""
response = self.llm.invoke(prompt)
return {
"question": question,
"answer": str(response.content),
"context_used": len(context),
"model": self.llm.model
}
Tool Executor with Recovery
# executor/dispatcher.py
import json
import asyncio
from typing import Any, Callable, Optional
from enum import Enum
class ErrorType(Enum):
RETRYABLE = "retryable"
NON_RETRYABLE = "non_retryable"
FATAL = "fatal"
class ToolDispatcher:
"""Dispatcher with built-in error recovery and rate limiting."""
def __init__(self, rate_limiter: Optional[Any] = None):
self.tools: dict[str, Callable] = {}
self.rate_limiter = rate_limiter
self.retry_config = {
"max_attempts": 3,
"base_delay": 1.0,
"max_delay": 30.0
}
def register(self, name: str, func: Callable):
self.tools[name] = func
async def execute(self, tool_name: str, params: dict) -> dict:
"""Execute tool with retry and rate limiting."""
if tool_name not in self.tools:
return {"error": f"Unknown tool: {tool_name}"}
# Check rate limit
if self.rate_limiter:
allowed, reason = self.rate_limiter.check_request(tool_name, 0)
if not allowed:
return {"error": f"Rate limited: {reason}"}
# Execute with retry
for attempt in range(self.retry_config["max_attempts"]):
try:
func = self.tools[tool_name]
# Handle sync and async functions
if asyncio.iscoroutinefunction(func):
result = await func(**params)
else:
result = await asyncio.to_thread(func, **params)
return {"success": True, "result": result}
except Exception as e:
error_type = self._classify_error(e)
if error_type == ErrorType.FATAL or attempt == self.retry_config["max_attempts"] - 1:
return {"error": str(e), "type": error_type.value}
# Exponential backoff
delay = self.retry_config["base_delay"] * (2 ** attempt)
await asyncio.sleep(min(delay, self.retry_config["max_delay"]))
return {"error": "Max retries exceeded"}
def _classify_error(self, error: Exception) -> ErrorType:
"""Classify error to determine retry behavior."""
if isinstance(error, (FileNotFoundError, PermissionError)):
return ErrorType.FATAL
if isinstance(error, (TimeoutError, ConnectionError)):
return ErrorType.RETRYABLE
return ErrorType.NON_RETRYABLE
API Server
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import structlog
from tools.reader import DocumentReader
from tools.search import DocumentSearch
from tools.qa import DocumentQA
from executor.dispatcher import ToolDispatcher
from executor.rate_limiter import RateLimiter
from monitoring.metrics import setup_metrics
app = FastAPI(title="Document Assistant")
logger = structlog.get_logger()
# Initialize components
doc_reader = DocumentReader("/data/documents", max_size=1024 * 1024)
doc_search = DocumentSearch(doc_reader)
doc_qa = DocumentQA()
dispatcher = ToolDispatcher()
dispatcher.register("read_document", doc_reader.read)
dispatcher.register("list_documents", doc_reader.list_documents)
dispatcher.register("search_documents", doc_search.search)
dispatcher.register("index_documents", doc_search.index_documents)
dispatcher.register("answer_question", doc_qa.answer)
rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)
setup_metrics(app)
class ChatRequest(BaseModel):
message: str
context_doc: Optional[str] = None
class ToolCallRequest(BaseModel):
tool: str
params: dict
@app.post("/chat")
async def chat(request: ChatRequest):
"""Chat endpoint that uses function calling."""
# Determine which tools to use based on message
tools_to_use = []
params = {}
if "read" in request.message.lower() or "show" in request.message.lower():
if request.context_doc:
tools_to_use.append(("read_document", {"filepath": request.context_doc}))
if "search" in request.message.lower():
tools_to_use.append(("index_documents", {}))
# Extract search terms (simplified)
tools_to_use.append(("search_documents", {"query": request.message}))
# Execute tools
results = []
for tool_name, tool_params in tools_to_use:
result = await dispatcher.execute(tool_name, tool_params)
results.append(result)
# Generate response using context
return {
"response": f"Executed {len(results)} tool(s)",
"tool_results": results
}
@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
"""Direct tool call endpoint."""
result = await dispatcher.execute(request.tool, request.params)
return result
@app.get("/health")
async def health():
return {"status": "healthy", "tools_available": list(dispatcher.tools.keys())}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Tests
# tests/test_tools.py
import pytest
import tempfile
import os
from pathlib import Path
from tools.reader import DocumentReader
from tools.search import DocumentSearch
class TestDocumentReader:
def setup(self):
self.temp_dir = tempfile.mkdtemp()
self.reader = DocumentReader(self.temp_dir, max_size=1024 * 100)
def test_read_existing_file(self):
test_file = Path(self.temp_dir) / "test.txt"
test_file.write_text("Hello, world!")
result = self.reader.read("test.txt")
assert result["filepath"] == "test.txt"
assert result["content"] == "Hello, world!"
def test_read_nonexistent_file(self):
with pytest.raises(FileNotFoundError):
self.reader.read("nonexistent.txt")
def test_path_traversal_blocked(self):
with pytest.raises(ValueError, match="path traversal"):
self.reader.read("../etc/passwd")
def test_list_documents(self):
Path(self.temp_dir, "doc1.txt").write_text("Content 1")
Path(self.temp_dir, "doc2.txt").write_text("Content 2")
docs = self.reader.list_documents(extension="txt")
assert len(docs) == 2
assert any(d["name"] == "doc1.txt" for d in docs)
class TestDocumentSearch:
def setup(self):
self.temp_dir = tempfile.mkdtemp()
self.reader = DocumentReader(self.temp_dir)
self.search = DocumentSearch(self.reader)
# Create test documents
Path(self.temp_dir, "python.txt").write_text("Python is great\nPython is fast")
Path(self.temp_dir, "rust.txt").write_text("Rust is safe\nRust is fast")
def test_index_documents(self):
result = self.search.index_documents()
assert result["documents_indexed"] == 2
def test_search_finds_match(self):
self.search.index_documents()
results = self.search.search("Python")
assert len(results) > 0
assert results[0]["document"] == "python.txt"
def test_search_no_results(self):
self.search.index_documents()
results = self.search.search("nonexistent")
assert len(results) == 0
# tests/test_executor.py
import pytest
from executor.dispatcher import ToolDispatcher, ErrorType
class TestToolDispatcher:
def setup(self):
self.dispatcher = ToolDispatcher()
self.call_count = 0
def test_register_and_execute(self):
def test_tool(name: str) -> dict:
return {"greeting": f"Hello, {name}"}
self.dispatcher.register("greet", test_tool)
result = self.dispatcher.execute_sync("greet", {"name": "World"})
assert result["success"]
assert result["result"]["greeting"] == "Hello, World"
def test_unknown_tool(self):
result = self.dispatcher.execute_sync("nonexistent", {})
assert "error" in result
def test_retry_on_retryable_error(self):
call_count = 0
def flaky_tool():
nonlocal call_count
call_count += 1
if call_count < 3:
raise TimeoutError("Simulated failure")
return {"success": True}
self.dispatcher.register("flaky", flaky_tool)
result = self.dispatcher.execute_sync("flaky", {})
assert result["success"]
assert call_count == 3
Running the Project
# Install dependencies
pip install fastapi uvicorn langchain-ollama prometheus-client structlog pydantic
# Start Ollama (in separate terminal)
ollama serve
# Download model
ollama pull llama3.2
# Create test documents
mkdir -p /data/documents
echo "Python is a versatile programming language." > /data/documents/python.txt
echo "Rust prioritizes safety and performance." > /data/documents/rust.txt
# Run the server
python app.py
# Test the API
curl -X POST http://localhost:8000/tools/call \
-H "Content-Type: application/json" \
-d '{"tool": "read_document", "params": {"filepath": "python.txt"}}'
# Run tests
pytest tests/ -v
EXERCISE
Extend the document assistant with a new tool that executes shell commands in a sandboxed environment. Add security checks, rate limiting, and tests. Verify that path traversal and command injection are blocked.