34. AI Pipeline Script
Chapter 34 of 36 · 15 min
Time to wire everything together. A complete pipeline script that ingests documents, embeds them, and stores vectors:
#!/usr/bin/env python3
"""Batch processing pipeline for document embeddings."""
import click
import json
from pathlib import Path
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.progress import Progress, BarColumn, TextColumn
console = Console()
@dataclass
class PipelineConfig:
batch_size: int = 32
workers: int = 4
model: str = "text-embedding-3-small"
output_dir: Path = Path("output/embeddings")
def load_documents(input_path: Path) -> list[dict]:
"""Load documents from file or directory."""
documents = []
if input_path.is_file():
with open(input_path) as f:
data = json.load(f)
documents = data if isinstance(data, list) else [data]
elif input_path.is_dir():
for file_path in input_path.glob("*.json"):
with open(file_path) as f:
data = json.load(f)
documents.extend(data if isinstance(data, list) else [data])
return documents
def embed_document(doc: dict, model: str) -> dict:
"""Simulate embedding generation."""
import time
import hashlib
# Simulate API call latency
time.sleep(0.05)
# Simulate embedding
text = doc.get("text", doc.get("content", ""))
seed = int(hashlib.md5(text.encode()).hexdigest()[:8], 16)
return {
"id": doc.get("id", f"doc_{seed}"),
"text": text[:100], # Truncated
"embedding": [seed / 1e8] * 1536, # 1536-dim fake embedding
"model": model,
"text_length": len(text)
}
def process_batch(documents: list[dict], config: PipelineConfig) -> list[dict]:
"""Process documents in parallel."""
results = []
with ThreadPoolExecutor(max_workers=config.workers) as executor:
futures = {
executor.submit(embed_document, doc, config.model): doc
for doc in documents
}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
console.print(f"[red]Error:[/red] {e}")
return results
@click.command()
@click.argument("input_path", type=click.Path(exists=True))
@click.option("--batch-size", "-b", default=32, help="Batch size")
@click.option("--workers", "-w", default=4, help="Parallel workers")
@click.option("--output", "-o", type=click.Path(), help="Output directory")
def run(input_path, batch_size, workers, output):
"""Run the document embedding pipeline."""
config = PipelineConfig(
batch_size=batch_size,
workers=workers,
output_dir=Path(output) if output else config.output_dir
)
console.print(f"[bold]Starting pipeline[/bold]")
console.print(f"Input: {input_path}")
console.print(f"Workers: {config.workers}")
documents = load_documents(Path(input_path))
console.print(f"Loaded {len(documents)} documents")
if not documents:
console.print("[yellow]No documents to process[/yellow]")
return
with Progress(TextColumn("[progress.description]{task.description}"), BarColumn(), console=console) as progress:
task = progress.add_task("[cyan]Embedding...", total=len(documents))
# Process in batches
all_results = []
for i in range(0, len(documents), config.batch_size):
batch = documents[i:i + config.batch_size]
results = process_batch(batch, config)
all_results.extend(results)
progress.update(task, advance=len(results))
# Save results
config.output_dir.mkdir(parents=True, exist_ok=True)
output_file = config.output_dir / "embeddings.json"
with open(output_file, "w") as f:
json.dump(all_results, f, indent=2)
console.print(f"[green]✓[/green] Saved {len(all_results)} embeddings to {output_file}")
if __name__ == "__main__":
run()
EXERCISE
Run the pipeline with test data. Create a JSON file with 10 sample documents containing {"id": "...", "text": "..."}. Run the pipeline and verify the output file. Add error handling for documents that are missing the "text" field.