RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Document Processing with Local AI
  6. /Ch. 18
Document Processing with Local AI

18. Document Pipeline Project

Chapter 18 of 18 · 25 min
KEY INSIGHT

A complete document pipeline combines extraction, quality checks, indexing, and automation. Each component remains independent, testable, and configurable. Start simple, add monitoring, and expand features as requirements emerge.

This chapter builds a complete document processing pipeline combining techniques from all previous chapters. The pipeline monitors a folder, extracts content from multiple formats, performs quality checks, indexes results, and provides search.

Project Structure

document-pipeline/
Γö£ΓöÇΓöÇ pipeline/
Γöé   Γö£ΓöÇΓöÇ __init__.py
Γöé   Γö£ΓöÇΓöÇ extractor.py
Γöé   Γö£ΓöÇΓöÇ quality.py
Γöé   Γö£ΓöÇΓöÇ indexer.py
Γöé   ΓööΓöÇΓöÇ watcher.py
Γö£ΓöÇΓöÇ config/
Γöé   ΓööΓöÇΓöÇ pipeline.yaml
Γö£ΓöÇΓöÇ tests/
Γöé   ΓööΓöÇΓöÇ test_pipeline.py
Γö£ΓöÇΓöÇ run.py
ΓööΓöÇΓöÇ requirements.txt

Core Pipeline Implementation

# pipeline/__init__.py
from dataclasses import dataclass, field
from typing import List, Optional
from pathlib import Path
import json
from datetime import datetime

@dataclass
class ProcessingResult:
    path: str
    status: str
    content: Optional[str] = None
    metadata: dict = field(default_factory=dict)
    errors: List[str] = field(default_factory=list)
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    
    def to_dict(self):
        return {
            "path": self.path,
            "status": self.status,
            "content": self.content,
            "metadata": self.metadata,
            "errors": self.errors,
            "timestamp": self.timestamp
        }
# pipeline/extractor.py
from pathlib import Path
import magic
import pymupdf
import pytesseract
from PIL import Image
import docx

class DocumentExtractor:
    def __init__(self):
        self.handlers = {
            "pdf": self._extract_pdf,
            "png": self._extract_image,
            "jpg": self._extract_image,
            "jpeg": self._extract_image,
            "txt": self._extract_text
        }
    
    def extract(self, path: str) -> dict:
        ext = Path(path).suffix.lower().lstrip(".")
        handler = self.handlers.get(ext, self._extract_unsupported)
        return handler(path)
    
    def _extract_pdf(self, path: str) -> dict:
        doc = pymupdf.open(path)
        content = "\n".join(page.get_text() for page in doc)
        return {"content": content, "metadata": {"pages": len(doc), "format": "pdf"}}
    
    def _extract_image(self, path: str) -> dict:
        content = pytesseract.image_to_string(Image.open(path))
        return {"content": content, "metadata": {"format": "image"}}
    
    def _extract_text(self, path: str) -> dict:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            content = f.read()
        return {"content": content, "metadata": {"format": "text"}}
    
    def _extract_unsupported(self, path: str) -> dict:
        return {"content": "", "metadata": {"format": "unsupported"}}
# pipeline/quality.py
class QualityChecker:
    def check(self, result: ProcessingResult) -> ProcessingResult:
        issues = []
        
        if not result.content:
            issues.append("No content extracted")
        elif len(result.content) < 50:
            issues.append("Content below minimum threshold")
        
        if result.errors:
            issues.extend(result.errors)
        
        result.metadata["quality_pass"] = len(issues) == 0
        result.metadata["quality_issues"] = issues
        
        return result
# pipeline/indexer.py
import sqlite3

class SearchIndexer:
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE VIRTUAL TABLE documents USING fts5(path, content)
        """)
        self.conn.commit()
    
    def index(self, result: ProcessingResult):
        if result.status == "success" and result.content:
            self.conn.execute(
                "INSERT INTO documents (path, content) VALUES (?, ?)",
                (result.path, result.content)
            )
            self.conn.commit()
    
    def search(self, query: str, limit: int = 10) -> list:
        cursor = self.conn.execute(
            "SELECT path, snippet(documents, 1, '<mark>', '</mark>', '...', 32) "
            "FROM documents WHERE documents MATCH ? LIMIT ?",
            (query, limit)
        )
        return cursor.fetchall()
    
    def close(self):
        self.conn.close()
# pipeline/watcher.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import time

class ProcessingHandler(FileSystemEventHandler):
    def __init__(self, extractor, quality_checker, indexer, output_dir):
        self.extractor = extractor
        self.quality_checker = quality_checker
        self.indexer = indexer
        self.output_dir = Path(output_dir)
        self.pending = {}
        self.lock = threading.Lock()
    
    def on_created(self, event):
        if event.is_directory:
            return
        self._schedule_processing(event.src_path)
    
    def on_modified(self, event):
        if event.is_directory:
            return
        self._schedule_processing(event.src_path)
    
    def _schedule_processing(self, path):
        with self.lock:
            if path in self.pending:
                self.pending[path].cancel()
            timer = threading.Timer(2.0, self._process, args=[path])
            self.pending[path] = timer
            timer.start()
    
    def _process(self, path):
        with self.lock:
            self.pending.pop(path, None)
        
        result = ProcessingResult(path=path, status="pending")
        
        try:
            extraction = self.extractor.extract(path)
            result.content = extraction.get("content", "")
            result.metadata = extraction.get("metadata", {})
            result.status = "success"
        except Exception as e:
            result.status = "failed"
            result.errors.append(str(e))
        
        result = self.quality_checker.check(result)
        self.indexer.index(result)
        
        output_file = self.output_dir / f"{Path(path).stem}.json"
        with open(output_file, "w") as f:
            json.dump(result.to_dict(), f, indent=2)

Main Entry Point

# run.py
import sys
import yaml
from pathlib import Path

from pipeline.extractor import DocumentExtractor
from pipeline.quality import QualityChecker
from pipeline.indexer import SearchIndexer
from pipeline.watcher import ProcessingHandler

def main():
    config_path = sys.argv[1] if len(sys.argv) > 1 else "config/pipeline.yaml"
    
    with open(config_path) as f:
        config = yaml.safe_load(f)
    
    extractor = DocumentExtractor()
    quality_checker = QualityChecker()
    indexer = SearchIndexer(config["index_db"])
    
    Path(config["output_dir"]).mkdir(parents=True, exist_ok=True)
    
    handler = ProcessingHandler(
        extractor=extractor,
        quality_checker=quality_checker,
        indexer=indexer,
        output_dir=config["output_dir"]
    )
    
    observer = Observer()
    observer.schedule(handler, config["watch_dir"], recursive=True)
    observer.start()
    
    print(f"Watching {config['watch_dir']} for changes...")
    print("Press Ctrl+C to stop")
    
    try:
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        indexer.close()
    
    observer.join()

if __name__ == "__main__":
    main()

Configuration

# config/pipeline.yaml
watch_dir: /documents/incoming
output_dir: /documents/processed
index_db: /documents/index/search.db
stability_seconds: 2
max_retries: 3

Testing the Pipeline

# tests/test_pipeline.py
import pytest
from pathlib import Path
import tempfile
import shutil

from pipeline.extractor import DocumentExtractor
from pipeline.quality import QualityChecker
from pipeline.indexer import SearchIndexer

class TestDocumentExtractor:
    def setup_method(self):
        self.extractor = DocumentExtractor()
    
    def test_extract_pdf(self, tmp_path):
        # Create a test PDF
        # ...
        result = self.extractor.extract(str(test_pdf))
        assert "content" in result
        assert result["metadata"]["format"] == "pdf"

class TestQualityChecker:
    def setup_method(self):
        self.checker = QualityChecker()
    
    def test_passes_valid_result(self):
        result = ProcessingResult(path="test.pdf", status="success", content="Valid content here")
        checked = self.checker.check(result)
        assert checked.metadata["quality_pass"] is True
    
    def test_flags_empty_content(self):
        result = ProcessingResult(path="test.pdf", status="success", content="")
        checked = self.checker.check(result)
        assert checked.metadata["quality_pass"] is False

class TestSearchIndexer:
    def setup_method(self):
        self.db = tempfile.NamedTemporaryFile(delete=False)
        self.indexer = SearchIndexer(self.db.name)
    
    def teardown_method(self):
        self.indexer.close()
        Path(self.db.name).unlink()
    
    def test_index_and_search(self):
        result = ProcessingResult(path="test.pdf", status="success", content="Hello world")
        self.indexer.index(result)
        results = self.indexer.search("Hello")
        assert len(results) == 1
EXERCISE

Extend the pipeline to support additional formats (DOCX, XLSX), implement batch processing mode for initial processing of existing documents, add a web interface for searching processed documents, and create a dashboard showing processing statistics.

← Chapter 17
Multi-Format Support
Course complete →
Browse all courses