13. Processing Pipelines
Real document workflows chain multiple operations together: extract text, classify content, extract specific entities, format output. Hardcoding these steps creates brittle code. Pipelines model workflows as composable stages.
Pipeline Design Principles
Each stage receives input, produces output, and may fail. Stages should be independentΓÇötestable in isolation, reusable in different contexts.
A Simple Pipeline Implementation
from dataclasses import dataclass
from typing import Optional, List, Any
@dataclass
class Document:
path: str
content: Optional[str] = None
metadata: dict = None
errors: List[str] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
if self.metadata is None:
self.metadata = {}
class Pipeline:
def __init__(self):
self.stages = []
def add(self, stage):
self.stages.append(stage)
return self
def process(self, document):
doc = document
for stage in self.stages:
try:
doc = stage.execute(doc)
if doc.errors and doc.content is None:
break
except Exception as e:
doc.errors.append(f"{stage.__class__.__name__}: {str(e)}")
break
return doc
Implementing Stages
Each stage implements an execute method:
class TextExtractionStage:
def execute(self, document):
with open(document.path, "rb") as f:
doc = pymupdf.open(stream=f.read(), filetype="pdf")
document.content = ""
for page in doc:
document.content += page.get_text()
document.metadata["pages"] = len(doc)
return document
class EntityExtractionStage:
def __init__(self, extractor):
self.extractor = extractor
def execute(self, document):
if not document.content:
return document
entities = self.extractor.extract(document.content)
document.metadata["entities"] = entities
return document
class OutputStage:
def __init__(self, output_path):
self.output_path = output_path
def execute(self, document):
output_file = Path(self.output_path) / f"{Path(document.path).stem}.json"
with open(output_file, "w") as f:
json.dump({
"path": document.path,
"content": document.content,
"metadata": document.metadata
}, f, indent=2)
return document
Composing the Pipeline
Build pipelines by chaining stages:
pipeline = (
Pipeline()
.add(TextExtractionStage())
.add(EntityExtractionStage(spacy_extractor))
.add(OutputStage("/results"))
)
Pipeline Configuration
Externalize configuration for reusability:
import yaml
def load_pipeline_from_config(config_path):
with open(config_path) as f:
config = yaml.safe_load(f)
pipeline = Pipeline()
for stage_config in config["stages"]:
stage_class = globals()[stage_config["class"]]
stage = stage_class(**stage_config.get("params", {}))
pipeline.add(stage)
return pipeline
YAML configuration:
stages:
- class: TextExtractionStage
params: {}
- class: EntityExtractionStage
params:
extractor: spacy_extractor
- class: OutputStage
params:
output_path: /results
Error Handling in Pipelines
Track errors without stopping the pipeline:
@dataclass
class PipelineResult:
success: bool
document: Document
failed_stages: List[str]
warnings: List[str]
class ResilientPipeline(Pipeline):
def process(self, document):
doc = Document(path=document) if isinstance(document, str) else document
result = PipelineResult(success=True, document=doc, failed_stages=[], warnings=[])
for stage in self.stages:
try:
doc = stage.execute(doc)
except Exception as e:
result.failed_stages.append(stage.__class__.__name__)
result.warnings.append(str(e))
result.success = len(result.failed_stages) == 0
return result
Design a pipeline with three configurable stages: preprocessor (clean text), analyzer (extract specific patterns), and formatter (output to specified format). Add a configuration loader that reads pipeline definitions from JSON.