36. Final Project: AI Data Pipeline

Chapter 36 of 36 · 25 min

You've learned the building blocks: OOP, regex, visualization, profiling, CLI tools, and batch processing. This final project weaves them together into a complete, production-ready AI data pipeline.

Project: "DocMind" - Document Ingestion and Embedding Pipeline

Create a complete CLI tool with this structure:

docmind/
├── pyproject.toml
├── src/
│   └── docmind/
│       ├── __init__.py
│       └── pipeline.py
├── tests/
│   └── test_pipeline.py
└── scripts/
    └── docmind.py

Features:

  1. Ingestion: Load documents from JSON, CSV, or directory
  2. Preprocessing: Extract text, clean whitespace, normalize
  3. Embedding: Batch process documents with simulated embeddings
  4. Output: Save to JSON with metadata
  5. CLI: Full command-line interface with progress, error handling
# src/docmind/pipeline.py
"""Core pipeline implementation."""

from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
import hashlib
import json
import re

@dataclass
class Document:
    id: str
    text: str
    metadata: dict = field(default_factory=dict)
    
    @property
    def word_count(self) -> int:
        return len(self.text.split())
    
    @property
    def char_count(self) -> int:
        return len(self.text)

class DocumentLoader:
    """Load documents from various sources."""
    
    def load_path(self, path: Path) -> Iterator[Document]:
        if path.suffix == ".json":
            yield from self._load_json(path)
        elif path.suffix == ".csv":
            yield from self._load_csv(path)
        elif path.is_dir():
            for file_path in path.rglob("*"):
                if file_path.suffix in (".json", ".txt"):
                    yield from self.load_path(file_path)
    
    def _load_json(self, path: Path) -> Iterator[Document]:
        with open(path) as f:
            data = json.load(f)
        if isinstance(data, list):
            for item in data:
                yield self._item_to_doc(item)
        else data:
            yield self._item_to_doc(data)
    
    def _load_csv(self, path: Path) -> Iterator[Document]:
        import csv
        with open(path, newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                yield self._item_to_doc(row)
    
    def _item_to_doc(self, item: dict) -> Document:
        text = item.get("text") or item.get("content") or item.get("body", "")
        doc_id = item.get("id") or self._generate_id(text)
        return Document(
            id=doc_id,
            text=text,
            metadata={k: v for k, v in item.items() if k not in ("id", "text", "content", "body")}
        )
    
    @staticmethod
    def _generate_id(text: str) -> str:
        return hashlib.md5(text.encode()).hexdigest()[:12]

class DocumentPreprocessor:
    """Clean and normalize document text."""
    
    def __init__(self, min_length: int = 10, strip_html: bool = True):
        self.min_length = min_length
        self.strip_html = strip_html
    
    def process(self, doc: Document) -> Document | None:
        text = doc.text
        
        if self.strip_html:
            text = re.sub(r"<[^>]+>", "", text)
        
        # Normalize whitespace
        text = re.sub(r"\s+", " ", text)
        text = text.strip()
        
        # Skip short documents
        if len(text) < self.min_length:
            return None
        
        return Document(
            id=doc.id,
            text=text,
            metadata={**doc.metadata, "word_count": len(text.split())}
        )

class EmbeddingProcessor:
    """Generate document embeddings (simulated)."""
    
    def __init__(self, dimension: int = 1536, batch_size: int = 32):
        self.dimension = dimension
        self.batch_size = batch_size
        self._cache = {}
    
    def embed(self, text: str) -> list[float]:
        """Simulate embedding generation."""
        if text in self._cache:
            return self._cache[text]
        
        seed = int(hashlib.md5(text.encode()).hexdigest()[:8], 16)
        embedding = [(seed + i) % 1000 / 1000 for i in range(self.dimension)]
        
        self._cache[text] = embedding
        return embedding
    
    def process(self, doc: Document) -> dict:
        embedding = self.embed(doc.text)
        return {
            "id": doc.id,
            "text": doc.text[:200],  # Truncated for storage
            "embedding": embedding,
            "dimension": self.dimension,
            "metadata": doc.metadata
        }

class Pipeline:
    """Main document processing pipeline."""
    
    def __init__(self, min_length: int = 10, embed_dim: int = 1536):
        self.loader = DocumentLoader()
        self.preprocessor = DocumentPreprocessor(min_length=min_length)
        self.embedder = EmbeddingProcessor(dimension=embed_dim)
    
    def process(self, source: Path) -> list[dict]:
        results = []
        
        for doc in self.loader.load_path(source):
            processed = self.preprocessor.process(doc)
            if processed:
                result = self.embedder.process(processed)
                results.append(result)
        
        return results

CLI script:

#!/usr/bin/env python3
# scripts/docmind.py
"""Command-line interface for DocMind."""

import click
from pathlib import Path
from rich.console import Console

import sys
sys.path.insert(0, str(Path(__file__).parent.parent))

from docmind.pipeline import Pipeline

console = Console()

@click.group()
@click.version_option(version="1.0.0")
def cli():
    """DocMind - Document Ingestion and Embedding Pipeline."""
    pass

@cli.command()
@click.argument("source", type=click.Path(exists=True))
@click.option("--output", "-o", type=click.Path(), default="outputs/embeddings.json")
@click.option("--min-length", "-m", type=int, default=10)
def run(source, output, min_length):
    """Process documents and generate embeddings."""
    source = Path(source)
    output = Path(output)
    
    console.print(f"[bold]DocMind Pipeline[/bold]")
    console.print(f"Source: {source}")
    console.print(f"Output: {output}")
    
    pipeline = Pipeline(min_length=min_length)
    console.print("[cyan]Processing documents...[/cyan]")
    
    results = pipeline.process(source)
    
    output.parent.mkdir(parents=True, exist_ok=True)
    import json
    with open(output, "w") as f:
        json.dump(results, f, indent=2)
    
    console.print(f"[green]✓[/green] Processed {len(results)} documents")
    console.print(f"[green]✓[/green] Output: {output}")

@cli.command()
@click.argument("embeddings_file", type=click.Path(exists=True))
def stats(embeddings_file):
    """Show statistics about embeddings."""
    import json
    
    with open(embeddings_file) as f:
        data = json.load(f)
    
    console.print(f"[bold]Embeddings Stats[/bold]")
    console.print(f"Document count: {len(data)}")
    
    if data:
        console.print(f"Embedding dimension: {data[0]['dimension']}")
        console.print(f"Total memory: ~{len(data) * data[0]['dimension'] * 8 / 1024:.1f} KB")

if __name__ == "__main__":
    cli()
EXERCISE

Complete the project by adding these features:

  1. Add tests/test_pipeline.py with tests for the preprocessor and loader
  2. Add a --batch-size flag to control embedding batch size
  3. Implement docmind stats --verbose that prints per-document statistics
  4. Add error handling with try/except blocks in the pipeline
  5. Write a README.md documenting usage

Run the pipeline end-to-end and verify the output.