18. Document Pipeline Project
Chapter 18 of 18 · 25 min
This chapter builds a complete document processing pipeline combining techniques from all previous chapters. The pipeline monitors a folder, extracts content from multiple formats, performs quality checks, indexes results, and provides search.
Project Structure
document-pipeline/
Γö£ΓöÇΓöÇ pipeline/
Γöé Γö£ΓöÇΓöÇ __init__.py
Γöé Γö£ΓöÇΓöÇ extractor.py
Γöé Γö£ΓöÇΓöÇ quality.py
Γöé Γö£ΓöÇΓöÇ indexer.py
Γöé ΓööΓöÇΓöÇ watcher.py
Γö£ΓöÇΓöÇ config/
Γöé ΓööΓöÇΓöÇ pipeline.yaml
Γö£ΓöÇΓöÇ tests/
Γöé ΓööΓöÇΓöÇ test_pipeline.py
Γö£ΓöÇΓöÇ run.py
ΓööΓöÇΓöÇ requirements.txt
Core Pipeline Implementation
# pipeline/__init__.py
from dataclasses import dataclass, field
from typing import List, Optional
from pathlib import Path
import json
from datetime import datetime
@dataclass
class ProcessingResult:
path: str
status: str
content: Optional[str] = None
metadata: dict = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self):
return {
"path": self.path,
"status": self.status,
"content": self.content,
"metadata": self.metadata,
"errors": self.errors,
"timestamp": self.timestamp
}
# pipeline/extractor.py
from pathlib import Path
import magic
import pymupdf
import pytesseract
from PIL import Image
import docx
class DocumentExtractor:
def __init__(self):
self.handlers = {
"pdf": self._extract_pdf,
"png": self._extract_image,
"jpg": self._extract_image,
"jpeg": self._extract_image,
"txt": self._extract_text
}
def extract(self, path: str) -> dict:
ext = Path(path).suffix.lower().lstrip(".")
handler = self.handlers.get(ext, self._extract_unsupported)
return handler(path)
def _extract_pdf(self, path: str) -> dict:
doc = pymupdf.open(path)
content = "\n".join(page.get_text() for page in doc)
return {"content": content, "metadata": {"pages": len(doc), "format": "pdf"}}
def _extract_image(self, path: str) -> dict:
content = pytesseract.image_to_string(Image.open(path))
return {"content": content, "metadata": {"format": "image"}}
def _extract_text(self, path: str) -> dict:
with open(path, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
return {"content": content, "metadata": {"format": "text"}}
def _extract_unsupported(self, path: str) -> dict:
return {"content": "", "metadata": {"format": "unsupported"}}
# pipeline/quality.py
class QualityChecker:
def check(self, result: ProcessingResult) -> ProcessingResult:
issues = []
if not result.content:
issues.append("No content extracted")
elif len(result.content) < 50:
issues.append("Content below minimum threshold")
if result.errors:
issues.extend(result.errors)
result.metadata["quality_pass"] = len(issues) == 0
result.metadata["quality_issues"] = issues
return result
# pipeline/indexer.py
import sqlite3
class SearchIndexer:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE VIRTUAL TABLE documents USING fts5(path, content)
""")
self.conn.commit()
def index(self, result: ProcessingResult):
if result.status == "success" and result.content:
self.conn.execute(
"INSERT INTO documents (path, content) VALUES (?, ?)",
(result.path, result.content)
)
self.conn.commit()
def search(self, query: str, limit: int = 10) -> list:
cursor = self.conn.execute(
"SELECT path, snippet(documents, 1, '<mark>', '</mark>', '...', 32) "
"FROM documents WHERE documents MATCH ? LIMIT ?",
(query, limit)
)
return cursor.fetchall()
def close(self):
self.conn.close()
# pipeline/watcher.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import time
class ProcessingHandler(FileSystemEventHandler):
def __init__(self, extractor, quality_checker, indexer, output_dir):
self.extractor = extractor
self.quality_checker = quality_checker
self.indexer = indexer
self.output_dir = Path(output_dir)
self.pending = {}
self.lock = threading.Lock()
def on_created(self, event):
if event.is_directory:
return
self._schedule_processing(event.src_path)
def on_modified(self, event):
if event.is_directory:
return
self._schedule_processing(event.src_path)
def _schedule_processing(self, path):
with self.lock:
if path in self.pending:
self.pending[path].cancel()
timer = threading.Timer(2.0, self._process, args=[path])
self.pending[path] = timer
timer.start()
def _process(self, path):
with self.lock:
self.pending.pop(path, None)
result = ProcessingResult(path=path, status="pending")
try:
extraction = self.extractor.extract(path)
result.content = extraction.get("content", "")
result.metadata = extraction.get("metadata", {})
result.status = "success"
except Exception as e:
result.status = "failed"
result.errors.append(str(e))
result = self.quality_checker.check(result)
self.indexer.index(result)
output_file = self.output_dir / f"{Path(path).stem}.json"
with open(output_file, "w") as f:
json.dump(result.to_dict(), f, indent=2)
Main Entry Point
# run.py
import sys
import yaml
from pathlib import Path
from pipeline.extractor import DocumentExtractor
from pipeline.quality import QualityChecker
from pipeline.indexer import SearchIndexer
from pipeline.watcher import ProcessingHandler
def main():
config_path = sys.argv[1] if len(sys.argv) > 1 else "config/pipeline.yaml"
with open(config_path) as f:
config = yaml.safe_load(f)
extractor = DocumentExtractor()
quality_checker = QualityChecker()
indexer = SearchIndexer(config["index_db"])
Path(config["output_dir"]).mkdir(parents=True, exist_ok=True)
handler = ProcessingHandler(
extractor=extractor,
quality_checker=quality_checker,
indexer=indexer,
output_dir=config["output_dir"]
)
observer = Observer()
observer.schedule(handler, config["watch_dir"], recursive=True)
observer.start()
print(f"Watching {config['watch_dir']} for changes...")
print("Press Ctrl+C to stop")
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
indexer.close()
observer.join()
if __name__ == "__main__":
main()
Configuration
# config/pipeline.yaml
watch_dir: /documents/incoming
output_dir: /documents/processed
index_db: /documents/index/search.db
stability_seconds: 2
max_retries: 3
Testing the Pipeline
# tests/test_pipeline.py
import pytest
from pathlib import Path
import tempfile
import shutil
from pipeline.extractor import DocumentExtractor
from pipeline.quality import QualityChecker
from pipeline.indexer import SearchIndexer
class TestDocumentExtractor:
def setup_method(self):
self.extractor = DocumentExtractor()
def test_extract_pdf(self, tmp_path):
# Create a test PDF
# ...
result = self.extractor.extract(str(test_pdf))
assert "content" in result
assert result["metadata"]["format"] == "pdf"
class TestQualityChecker:
def setup_method(self):
self.checker = QualityChecker()
def test_passes_valid_result(self):
result = ProcessingResult(path="test.pdf", status="success", content="Valid content here")
checked = self.checker.check(result)
assert checked.metadata["quality_pass"] is True
def test_flags_empty_content(self):
result = ProcessingResult(path="test.pdf", status="success", content="")
checked = self.checker.check(result)
assert checked.metadata["quality_pass"] is False
class TestSearchIndexer:
def setup_method(self):
self.db = tempfile.NamedTemporaryFile(delete=False)
self.indexer = SearchIndexer(self.db.name)
def teardown_method(self):
self.indexer.close()
Path(self.db.name).unlink()
def test_index_and_search(self):
result = ProcessingResult(path="test.pdf", status="success", content="Hello world")
self.indexer.index(result)
results = self.indexer.search("Hello")
assert len(results) == 1
EXERCISE
Extend the pipeline to support additional formats (DOCX, XLSX), implement batch processing mode for initial processing of existing documents, add a web interface for searching processed documents, and create a dashboard showing processing statistics.