18. Tool Ecosystem Project

Chapter 18 of 18 · 35 min

This chapter brings together everything learned in previous chapters. Build a complete function calling system with multiple tools, error recovery, rate limiting, monitoring, and tests.

Project Overview

Build a local document assistant with these capabilities:

  • Read documents from a specified directory
  • Search within documents
  • Summarize document contents
  • Answer questions about documents

Project Structure

document_assistant/
├── tools/
│   ├── __init__.py
│   ├── reader.py
│   ├── search.py
│   ├── summarizer.py
│   └── qa.py
├── executor/
│   ├── __init__.py
│   ├── dispatcher.py
│   ├── recovery.py
│   └── rate_limiter.py
├── monitoring/
│   ├── __init__.py
│   └── metrics.py
├── tests/
│   ├── test_tools.py
│   ├── test_executor.py
│   └── test_integration.py
├── app.py
├── config.py
└── requirements.txt

Core Tool Implementations

# tools/reader.py
import os
import hashlib
from pathlib import Path
from typing import Optional

class DocumentReader:
    """Read documents from allowed directory with security checks."""
    
    def __init__(self, allowed_dir: str, max_size: int = 1024 * 1024):
        self.allowed_dir = Path(allowed_dir).resolve()
        self.max_size = max_size
    
    def read(self, filepath: str) -> dict:
        """Read document content."""
        
        # Security: prevent path traversal
        target = (self.allowed_dir / filepath).resolve()
        
        if not str(target).startswith(str(self.allowed_dir)):
            raise ValueError("Access denied: path traversal attempt")
        
        if not target.exists():
            raise FileNotFoundError(f"File not found: {filepath}")
        
        # Size check
        size = target.stat().st_size
        if size > self.max_size:
            raise ValueError(f"File too large: {size} bytes (max {self.max_size})")
        
        # Read content
        with open(target, 'r', encoding='utf-8', errors='replace') as f:
            content = f.read()
        
        return {
            "filepath": str(target.relative_to(self.allowed_dir)),
            "size": size,
            "content": content[:self.max_size],
            "hash": hashlib.sha256(content.encode()).hexdigest()[:16]
        }
    
    def list_documents(self, extension: Optional[str] = None) -> list[dict]:
        """List all documents in allowed directory."""
        
        documents = []
        pattern = f"*.{extension}" if extension else "*"
        
        for path in self.allowed_dir.glob(pattern):
            if path.is_file():
                documents.append({
                    "name": path.name,
                    "size": path.stat().st_size,
                    "modified": path.stat().st_mtime
                })
        
        return documents
# tools/search.py
import re
from collections import defaultdict
from .reader import DocumentReader

class DocumentSearch:
    """Search within documents by keyword or pattern."""
    
    def __init__(self, reader: DocumentReader):
        self.reader = reader
        self._index: dict[str, list[tuple[int, str]]] = {}
    
    def index_documents(self, glob_pattern: str = "*.txt") -> dict:
        """Build search index from documents."""
        
        documents = self.reader.list_documents()
        self._index.clear()
        
        for doc in documents:
            try:
                content = self.reader.read(doc["name"])["content"]
                lines = content.split('\n')
                
                self._index[doc["name"]] = [
                    (i + 1, line) for i, line in enumerate(lines) if line.strip()
                ]
            except Exception as e:
                continue
        
        return {
            "documents_indexed": len(self._index),
            "total_lines": sum(len(v) for v in self._index.values())
        }
    
    def search(self, query: str, case_sensitive: bool = False, max_results: int = 10) -> list[dict]:
        """Search for query string in indexed documents."""
        
        if case_sensitive:
            pattern = re.compile(re.escape(query))
        else:
            pattern = re.compile(re.escape(query), re.IGNORECASE)
        
        results = []
        for doc_name, lines in self._index.items():
            for line_num, line in lines:
                if pattern.search(line):
                    results.append({
                        "document": doc_name,
                        "line": line_num,
                        "text": line.strip()
                    })
                    if len(results) >= max_results:
                        return results
        
        return results
# tools/qa.py
from langchain_ollama import ChatOllama

class DocumentQA:
    """Answer questions about documents using local LLM."""
    
    def __init__(self, model: str = "llama3.2", base_url: str = "http://localhost:11434"):
        self.llm = ChatOllama(model=model, base_url=base_url, temperature=0.3)
    
    def answer(self, question: str, context: str, max_context_length: int = 4000) -> dict:
        """Answer question based on document context."""
        
        # Truncate context if too long
        if len(context) > max_context_length:
            context = context[:max_context_length] + "..."
        
        prompt = f"""Based on the following document content, answer the question.

Document content:
{context}

Question: {question}

Answer:"""
        
        response = self.llm.invoke(prompt)
        
        return {
            "question": question,
            "answer": str(response.content),
            "context_used": len(context),
            "model": self.llm.model
        }

Tool Executor with Recovery

# executor/dispatcher.py
import json
import asyncio
from typing import Any, Callable, Optional
from enum import Enum

class ErrorType(Enum):
    RETRYABLE = "retryable"
    NON_RETRYABLE = "non_retryable"
    FATAL = "fatal"

class ToolDispatcher:
    """Dispatcher with built-in error recovery and rate limiting."""
    
    def __init__(self, rate_limiter: Optional[Any] = None):
        self.tools: dict[str, Callable] = {}
        self.rate_limiter = rate_limiter
        self.retry_config = {
            "max_attempts": 3,
            "base_delay": 1.0,
            "max_delay": 30.0
        }
    
    def register(self, name: str, func: Callable):
        self.tools[name] = func
    
    async def execute(self, tool_name: str, params: dict) -> dict:
        """Execute tool with retry and rate limiting."""
        
        if tool_name not in self.tools:
            return {"error": f"Unknown tool: {tool_name}"}
        
        # Check rate limit
        if self.rate_limiter:
            allowed, reason = self.rate_limiter.check_request(tool_name, 0)
            if not allowed:
                return {"error": f"Rate limited: {reason}"}
        
        # Execute with retry
        for attempt in range(self.retry_config["max_attempts"]):
            try:
                func = self.tools[tool_name]
                
                # Handle sync and async functions
                if asyncio.iscoroutinefunction(func):
                    result = await func(**params)
                else:
                    result = await asyncio.to_thread(func, **params)
                
                return {"success": True, "result": result}
                
            except Exception as e:
                error_type = self._classify_error(e)
                
                if error_type == ErrorType.FATAL or attempt == self.retry_config["max_attempts"] - 1:
                    return {"error": str(e), "type": error_type.value}
                
                # Exponential backoff
                delay = self.retry_config["base_delay"] * (2 ** attempt)
                await asyncio.sleep(min(delay, self.retry_config["max_delay"]))
        
        return {"error": "Max retries exceeded"}
    
    def _classify_error(self, error: Exception) -> ErrorType:
        """Classify error to determine retry behavior."""
        
        if isinstance(error, (FileNotFoundError, PermissionError)):
            return ErrorType.FATAL
        
        if isinstance(error, (TimeoutError, ConnectionError)):
            return ErrorType.RETRYABLE
        
        return ErrorType.NON_RETRYABLE

API Server

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import structlog

from tools.reader import DocumentReader
from tools.search import DocumentSearch
from tools.qa import DocumentQA
from executor.dispatcher import ToolDispatcher
from executor.rate_limiter import RateLimiter
from monitoring.metrics import setup_metrics

app = FastAPI(title="Document Assistant")
logger = structlog.get_logger()

# Initialize components
doc_reader = DocumentReader("/data/documents", max_size=1024 * 1024)
doc_search = DocumentSearch(doc_reader)
doc_qa = DocumentQA()

dispatcher = ToolDispatcher()
dispatcher.register("read_document", doc_reader.read)
dispatcher.register("list_documents", doc_reader.list_documents)
dispatcher.register("search_documents", doc_search.search)
dispatcher.register("index_documents", doc_search.index_documents)
dispatcher.register("answer_question", doc_qa.answer)

rate_limiter = RateLimiter(requests_per_minute=60, tokens_per_minute=100000)
setup_metrics(app)

class ChatRequest(BaseModel):
    message: str
    context_doc: Optional[str] = None

class ToolCallRequest(BaseModel):
    tool: str
    params: dict

@app.post("/chat")
async def chat(request: ChatRequest):
    """Chat endpoint that uses function calling."""
    
    # Determine which tools to use based on message
    tools_to_use = []
    params = {}
    
    if "read" in request.message.lower() or "show" in request.message.lower():
        if request.context_doc:
            tools_to_use.append(("read_document", {"filepath": request.context_doc}))
    
    if "search" in request.message.lower():
        tools_to_use.append(("index_documents", {}))
        # Extract search terms (simplified)
        tools_to_use.append(("search_documents", {"query": request.message}))
    
    # Execute tools
    results = []
    for tool_name, tool_params in tools_to_use:
        result = await dispatcher.execute(tool_name, tool_params)
        results.append(result)
    
    # Generate response using context
    return {
        "response": f"Executed {len(results)} tool(s)",
        "tool_results": results
    }

@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
    """Direct tool call endpoint."""
    result = await dispatcher.execute(request.tool, request.params)
    return result

@app.get("/health")
async def health():
    return {"status": "healthy", "tools_available": list(dispatcher.tools.keys())}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Tests

# tests/test_tools.py
import pytest
import tempfile
import os
from pathlib import Path

from tools.reader import DocumentReader
from tools.search import DocumentSearch

class TestDocumentReader:
    def setup(self):
        self.temp_dir = tempfile.mkdtemp()
        self.reader = DocumentReader(self.temp_dir, max_size=1024 * 100)
    
    def test_read_existing_file(self):
        test_file = Path(self.temp_dir) / "test.txt"
        test_file.write_text("Hello, world!")
        
        result = self.reader.read("test.txt")
        
        assert result["filepath"] == "test.txt"
        assert result["content"] == "Hello, world!"
    
    def test_read_nonexistent_file(self):
        with pytest.raises(FileNotFoundError):
            self.reader.read("nonexistent.txt")
    
    def test_path_traversal_blocked(self):
        with pytest.raises(ValueError, match="path traversal"):
            self.reader.read("../etc/passwd")
    
    def test_list_documents(self):
        Path(self.temp_dir, "doc1.txt").write_text("Content 1")
        Path(self.temp_dir, "doc2.txt").write_text("Content 2")
        
        docs = self.reader.list_documents(extension="txt")
        
        assert len(docs) == 2
        assert any(d["name"] == "doc1.txt" for d in docs)

class TestDocumentSearch:
    def setup(self):
        self.temp_dir = tempfile.mkdtemp()
        self.reader = DocumentReader(self.temp_dir)
        self.search = DocumentSearch(self.reader)
        
        # Create test documents
        Path(self.temp_dir, "python.txt").write_text("Python is great\nPython is fast")
        Path(self.temp_dir, "rust.txt").write_text("Rust is safe\nRust is fast")
    
    def test_index_documents(self):
        result = self.search.index_documents()
        
        assert result["documents_indexed"] == 2
    
    def test_search_finds_match(self):
        self.search.index_documents()
        
        results = self.search.search("Python")
        
        assert len(results) > 0
        assert results[0]["document"] == "python.txt"
    
    def test_search_no_results(self):
        self.search.index_documents()
        
        results = self.search.search("nonexistent")
        
        assert len(results) == 0

# tests/test_executor.py
import pytest
from executor.dispatcher import ToolDispatcher, ErrorType

class TestToolDispatcher:
    def setup(self):
        self.dispatcher = ToolDispatcher()
        self.call_count = 0
    
    def test_register_and_execute(self):
        def test_tool(name: str) -> dict:
            return {"greeting": f"Hello, {name}"}
        
        self.dispatcher.register("greet", test_tool)
        
        result = self.dispatcher.execute_sync("greet", {"name": "World"})
        
        assert result["success"]
        assert result["result"]["greeting"] == "Hello, World"
    
    def test_unknown_tool(self):
        result = self.dispatcher.execute_sync("nonexistent", {})
        
        assert "error" in result
    
    def test_retry_on_retryable_error(self):
        call_count = 0
        
        def flaky_tool():
            nonlocal call_count
            call_count += 1
            if call_count < 3:
                raise TimeoutError("Simulated failure")
            return {"success": True}
        
        self.dispatcher.register("flaky", flaky_tool)
        result = self.dispatcher.execute_sync("flaky", {})
        
        assert result["success"]
        assert call_count == 3

Running the Project

# Install dependencies
pip install fastapi uvicorn langchain-ollama prometheus-client structlog pydantic

# Start Ollama (in separate terminal)
ollama serve

# Download model
ollama pull llama3.2

# Create test documents
mkdir -p /data/documents
echo "Python is a versatile programming language." > /data/documents/python.txt
echo "Rust prioritizes safety and performance." > /data/documents/rust.txt

# Run the server
python app.py

# Test the API
curl -X POST http://localhost:8000/tools/call \
  -H "Content-Type: application/json" \
  -d '{"tool": "read_document", "params": {"filepath": "python.txt"}}'

# Run tests
pytest tests/ -v
EXERCISE

Extend the document assistant with a new tool that executes shell commands in a sandboxed environment. Add security checks, rate limiting, and tests. Verify that path traversal and command injection are blocked.