RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Document Processing with Local AI
  6. /Ch. 13
Document Processing with Local AI

13. Processing Pipelines

Chapter 13 of 18 · 20 min
KEY INSIGHT

Pipelines convert sequential operations into reusable, configurable workflows. Stage-based design separates concerns, simplifies testing, and enables visual pipeline builders.

Real document workflows chain multiple operations together: extract text, classify content, extract specific entities, format output. Hardcoding these steps creates brittle code. Pipelines model workflows as composable stages.

Pipeline Design Principles

Each stage receives input, produces output, and may fail. Stages should be independentΓÇötestable in isolation, reusable in different contexts.

A Simple Pipeline Implementation

from dataclasses import dataclass
from typing import Optional, List, Any

@dataclass
class Document:
    path: str
    content: Optional[str] = None
    metadata: dict = None
    errors: List[str] = None
    
    def __post_init__(self):
        if self.errors is None:
            self.errors = []
        if self.metadata is None:
            self.metadata = {}

class Pipeline:
    def __init__(self):
        self.stages = []
    
    def add(self, stage):
        self.stages.append(stage)
        return self
    
    def process(self, document):
        doc = document
        for stage in self.stages:
            try:
                doc = stage.execute(doc)
                if doc.errors and doc.content is None:
                    break
            except Exception as e:
                doc.errors.append(f"{stage.__class__.__name__}: {str(e)}")
                break
        return doc

Implementing Stages

Each stage implements an execute method:

class TextExtractionStage:
    def execute(self, document):
        with open(document.path, "rb") as f:
            doc = pymupdf.open(stream=f.read(), filetype="pdf")
            document.content = ""
            for page in doc:
                document.content += page.get_text()
        document.metadata["pages"] = len(doc)
        return document

class EntityExtractionStage:
    def __init__(self, extractor):
        self.extractor = extractor
    
    def execute(self, document):
        if not document.content:
            return document
        entities = self.extractor.extract(document.content)
        document.metadata["entities"] = entities
        return document

class OutputStage:
    def __init__(self, output_path):
        self.output_path = output_path
    
    def execute(self, document):
        output_file = Path(self.output_path) / f"{Path(document.path).stem}.json"
        with open(output_file, "w") as f:
            json.dump({
                "path": document.path,
                "content": document.content,
                "metadata": document.metadata
            }, f, indent=2)
        return document

Composing the Pipeline

Build pipelines by chaining stages:

pipeline = (
    Pipeline()
    .add(TextExtractionStage())
    .add(EntityExtractionStage(spacy_extractor))
    .add(OutputStage("/results"))
)

Pipeline Configuration

Externalize configuration for reusability:

import yaml

def load_pipeline_from_config(config_path):
    with open(config_path) as f:
        config = yaml.safe_load(f)
    
    pipeline = Pipeline()
    for stage_config in config["stages"]:
        stage_class = globals()[stage_config["class"]]
        stage = stage_class(**stage_config.get("params", {}))
        pipeline.add(stage)
    
    return pipeline

YAML configuration:

stages:
  - class: TextExtractionStage
    params: {}
  - class: EntityExtractionStage
    params:
      extractor: spacy_extractor
  - class: OutputStage
    params:
      output_path: /results

Error Handling in Pipelines

Track errors without stopping the pipeline:

@dataclass
class PipelineResult:
    success: bool
    document: Document
    failed_stages: List[str]
    warnings: List[str]

class ResilientPipeline(Pipeline):
    def process(self, document):
        doc = Document(path=document) if isinstance(document, str) else document
        result = PipelineResult(success=True, document=doc, failed_stages=[], warnings=[])
        
        for stage in self.stages:
            try:
                doc = stage.execute(doc)
            except Exception as e:
                result.failed_stages.append(stage.__class__.__name__)
                result.warnings.append(str(e))
        
        result.success = len(result.failed_stages) == 0
        return result
EXERCISE

Design a pipeline with three configurable stages: preprocessor (clean text), analyzer (extract specific patterns), and formatter (output to specified format). Add a configuration loader that reads pipeline definitions from JSON.

← Chapter 12
Watch-Folder Automation
Chapter 14 →
Error Handling