36. Final Project: AI Data Pipeline
Chapter 36 of 36 · 25 min
You've learned the building blocks: OOP, regex, visualization, profiling, CLI tools, and batch processing. This final project weaves them together into a complete, production-ready AI data pipeline.
Project: "DocMind" - Document Ingestion and Embedding Pipeline
Create a complete CLI tool with this structure:
docmind/
├── pyproject.toml
├── src/
│ └── docmind/
│ ├── __init__.py
│ └── pipeline.py
├── tests/
│ └── test_pipeline.py
└── scripts/
└── docmind.py
Features:
- Ingestion: Load documents from JSON, CSV, or directory
- Preprocessing: Extract text, clean whitespace, normalize
- Embedding: Batch process documents with simulated embeddings
- Output: Save to JSON with metadata
- CLI: Full command-line interface with progress, error handling
# src/docmind/pipeline.py
"""Core pipeline implementation."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
import hashlib
import json
import re
@dataclass
class Document:
id: str
text: str
metadata: dict = field(default_factory=dict)
@property
def word_count(self) -> int:
return len(self.text.split())
@property
def char_count(self) -> int:
return len(self.text)
class DocumentLoader:
"""Load documents from various sources."""
def load_path(self, path: Path) -> Iterator[Document]:
if path.suffix == ".json":
yield from self._load_json(path)
elif path.suffix == ".csv":
yield from self._load_csv(path)
elif path.is_dir():
for file_path in path.rglob("*"):
if file_path.suffix in (".json", ".txt"):
yield from self.load_path(file_path)
def _load_json(self, path: Path) -> Iterator[Document]:
with open(path) as f:
data = json.load(f)
if isinstance(data, list):
for item in data:
yield self._item_to_doc(item)
else data:
yield self._item_to_doc(data)
def _load_csv(self, path: Path) -> Iterator[Document]:
import csv
with open(path, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
yield self._item_to_doc(row)
def _item_to_doc(self, item: dict) -> Document:
text = item.get("text") or item.get("content") or item.get("body", "")
doc_id = item.get("id") or self._generate_id(text)
return Document(
id=doc_id,
text=text,
metadata={k: v for k, v in item.items() if k not in ("id", "text", "content", "body")}
)
@staticmethod
def _generate_id(text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()[:12]
class DocumentPreprocessor:
"""Clean and normalize document text."""
def __init__(self, min_length: int = 10, strip_html: bool = True):
self.min_length = min_length
self.strip_html = strip_html
def process(self, doc: Document) -> Document | None:
text = doc.text
if self.strip_html:
text = re.sub(r"<[^>]+>", "", text)
# Normalize whitespace
text = re.sub(r"\s+", " ", text)
text = text.strip()
# Skip short documents
if len(text) < self.min_length:
return None
return Document(
id=doc.id,
text=text,
metadata={**doc.metadata, "word_count": len(text.split())}
)
class EmbeddingProcessor:
"""Generate document embeddings (simulated)."""
def __init__(self, dimension: int = 1536, batch_size: int = 32):
self.dimension = dimension
self.batch_size = batch_size
self._cache = {}
def embed(self, text: str) -> list[float]:
"""Simulate embedding generation."""
if text in self._cache:
return self._cache[text]
seed = int(hashlib.md5(text.encode()).hexdigest()[:8], 16)
embedding = [(seed + i) % 1000 / 1000 for i in range(self.dimension)]
self._cache[text] = embedding
return embedding
def process(self, doc: Document) -> dict:
embedding = self.embed(doc.text)
return {
"id": doc.id,
"text": doc.text[:200], # Truncated for storage
"embedding": embedding,
"dimension": self.dimension,
"metadata": doc.metadata
}
class Pipeline:
"""Main document processing pipeline."""
def __init__(self, min_length: int = 10, embed_dim: int = 1536):
self.loader = DocumentLoader()
self.preprocessor = DocumentPreprocessor(min_length=min_length)
self.embedder = EmbeddingProcessor(dimension=embed_dim)
def process(self, source: Path) -> list[dict]:
results = []
for doc in self.loader.load_path(source):
processed = self.preprocessor.process(doc)
if processed:
result = self.embedder.process(processed)
results.append(result)
return results
CLI script:
#!/usr/bin/env python3
# scripts/docmind.py
"""Command-line interface for DocMind."""
import click
from pathlib import Path
from rich.console import Console
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from docmind.pipeline import Pipeline
console = Console()
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""DocMind - Document Ingestion and Embedding Pipeline."""
pass
@cli.command()
@click.argument("source", type=click.Path(exists=True))
@click.option("--output", "-o", type=click.Path(), default="outputs/embeddings.json")
@click.option("--min-length", "-m", type=int, default=10)
def run(source, output, min_length):
"""Process documents and generate embeddings."""
source = Path(source)
output = Path(output)
console.print(f"[bold]DocMind Pipeline[/bold]")
console.print(f"Source: {source}")
console.print(f"Output: {output}")
pipeline = Pipeline(min_length=min_length)
console.print("[cyan]Processing documents...[/cyan]")
results = pipeline.process(source)
output.parent.mkdir(parents=True, exist_ok=True)
import json
with open(output, "w") as f:
json.dump(results, f, indent=2)
console.print(f"[green]✓[/green] Processed {len(results)} documents")
console.print(f"[green]✓[/green] Output: {output}")
@cli.command()
@click.argument("embeddings_file", type=click.Path(exists=True))
def stats(embeddings_file):
"""Show statistics about embeddings."""
import json
with open(embeddings_file) as f:
data = json.load(f)
console.print(f"[bold]Embeddings Stats[/bold]")
console.print(f"Document count: {len(data)}")
if data:
console.print(f"Embedding dimension: {data[0]['dimension']}")
console.print(f"Total memory: ~{len(data) * data[0]['dimension'] * 8 / 1024:.1f} KB")
if __name__ == "__main__":
cli()
EXERCISE
Complete the project by adding these features:
- Add
tests/test_pipeline.pywith tests for the preprocessor and loader - Add a
--batch-sizeflag to control embedding batch size - Implement
docmind stats --verbosethat prints per-document statistics - Add error handling with
try/exceptblocks in the pipeline - Write a
README.mddocumenting usage
Run the pipeline end-to-end and verify the output.