RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Python for AI — Zero to Useful
  6. /Ch. 34
Python for AI — Zero to Useful

34. AI Pipeline Script

Chapter 34 of 36 · 15 min
KEY INSIGHT

Use `ThreadPoolExecutor` for I/O-bound work (API calls) not `ProcessPoolExecutor`. For CPU-bound work, processes; for slow external calls, threads. The batches + progress bar pattern keeps users informed during long operations.

Time to wire everything together. A complete pipeline script that ingests documents, embeds them, and stores vectors:

#!/usr/bin/env python3
"""Batch processing pipeline for document embeddings."""

import click
import json
from pathlib import Path
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed

from rich.console import Console
from rich.progress import Progress, BarColumn, TextColumn

console = Console()

@dataclass
class PipelineConfig:
    batch_size: int = 32
    workers: int = 4
    model: str = "text-embedding-3-small"
    output_dir: Path = Path("output/embeddings")

def load_documents(input_path: Path) -> list[dict]:
    """Load documents from file or directory."""
    documents = []
    
    if input_path.is_file():
        with open(input_path) as f:
            data = json.load(f)
            documents = data if isinstance(data, list) else [data]
    
    elif input_path.is_dir():
        for file_path in input_path.glob("*.json"):
            with open(file_path) as f:
                data = json.load(f)
                documents.extend(data if isinstance(data, list) else [data])
    
    return documents

def embed_document(doc: dict, model: str) -> dict:
    """Simulate embedding generation."""
    import time
    import hashlib
    
    # Simulate API call latency
    time.sleep(0.05)
    
    # Simulate embedding
    text = doc.get("text", doc.get("content", ""))
    seed = int(hashlib.md5(text.encode()).hexdigest()[:8], 16)
    
    return {
        "id": doc.get("id", f"doc_{seed}"),
        "text": text[:100],  # Truncated
        "embedding": [seed / 1e8] * 1536,  # 1536-dim fake embedding
        "model": model,
        "text_length": len(text)
    }

def process_batch(documents: list[dict], config: PipelineConfig) -> list[dict]:
    """Process documents in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=config.workers) as executor:
        futures = {
            executor.submit(embed_document, doc, config.model): doc
            for doc in documents
        }
        
        for future in as_completed(futures):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                console.print(f"[red]Error:[/red] {e}")
    
    return results

@click.command()
@click.argument("input_path", type=click.Path(exists=True))
@click.option("--batch-size", "-b", default=32, help="Batch size")
@click.option("--workers", "-w", default=4, help="Parallel workers")
@click.option("--output", "-o", type=click.Path(), help="Output directory")
def run(input_path, batch_size, workers, output):
    """Run the document embedding pipeline."""
    config = PipelineConfig(
        batch_size=batch_size,
        workers=workers,
        output_dir=Path(output) if output else config.output_dir
    )
    
    console.print(f"[bold]Starting pipeline[/bold]")
    console.print(f"Input: {input_path}")
    console.print(f"Workers: {config.workers}")
    
    documents = load_documents(Path(input_path))
    console.print(f"Loaded {len(documents)} documents")
    
    if not documents:
        console.print("[yellow]No documents to process[/yellow]")
        return
    
    with Progress(TextColumn("[progress.description]{task.description}"), BarColumn(), console=console) as progress:
        task = progress.add_task("[cyan]Embedding...", total=len(documents))
        
        # Process in batches
        all_results = []
        for i in range(0, len(documents), config.batch_size):
            batch = documents[i:i + config.batch_size]
            results = process_batch(batch, config)
            all_results.extend(results)
            progress.update(task, advance=len(results))
    
    # Save results
    config.output_dir.mkdir(parents=True, exist_ok=True)
    output_file = config.output_dir / "embeddings.json"
    with open(output_file, "w") as f:
        json.dump(all_results, f, indent=2)
    
    console.print(f"[green]✓[/green] Saved {len(all_results)} embeddings to {output_file}")

if __name__ == "__main__":
    run()
EXERCISE

Run the pipeline with test data. Create a JSON file with 10 sample documents containing {"id": "...", "text": "..."}. Run the pipeline and verify the output file. Add error handling for documents that are missing the "text" field.

← Chapter 33
CLI with Rich Output
Chapter 35 →
Batch Processing with Progress