HOW-TO · SUP

How to build a document analysis agent pipeline

advanced35 minBy Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

Document processing libraries, LLM, document store

What this does

A document analysis pipeline ingests raw files, extracts structured data, generates summaries, and indexes content for retrieval. This guide builds a production-ready pipeline handling PDF, DOCX, and plain text using modular stages.

Steps

Step 1: Install dependencies

pip install pymupdf python-docx langchain-community pypdf rapidfuzz

pymupdf handles PDF parsing with layout preservation. python-docx parses Word documents. langchain-community provides document splitters and embedding integration. rapidfuzz enables fuzzy entity matching.

Step 2: Build the document parser stage

import fitz  # PyMuPDF
from docx import Document
from pathlib import Path
from typing import Dict, List

class DocumentParser:
    """Parse multiple file formats into raw text."""

    def parse(self, file_path: str) -> Dict:
        path = Path(file_path)
        ext = path.suffix.lower()
        if ext == ".pdf":
            return self._parse_pdf(file_path)
        elif ext == ".docx":
            return self._parse_docx(file_path)
        elif ext == ".txt":
            return self._parse_txt(file_path)
        else:
            raise ValueError(f"Unsupported format: {ext}")

    def _parse_pdf(self, path: str) -> Dict:
        doc = fitz.open(path)
        pages = []
        for page in doc:
            text = page.get_text()
            pages.append(text)
        return {
            "source": path,
            "format": "pdf",
            "pages": pages,
            "total_pages": len(pages),
            "full_text": "\n\n".join(pages)
        }

    def _parse_docx(self, path: str) -> Dict:
        doc = Document(path)
        paragraphs = [p.text for p in doc.paragraphs]
        return {
            "source": path,
            "format": "docx",
            "paragraphs": paragraphs,
            "full_text": "\n".join(paragraphs)
        }

    def _parse_txt(self, path: str) -> Dict:
        with open(path, "r", encoding="utf-8") as f:
            text = f.read()
        return {
            "source": path,
            "format": "txt",
            "full_text": text
        }

Step 3: Build the extraction engine

from rapidfuzz import fuzz
from datetime import datetime

class ExtractionEngine:
    """Extract structured data from parsed document text."""

    def extract(self, parsed_doc: Dict) -> Dict:
        text = parsed_doc["full_text"]
        entities = {
            "email_addresses": self._extract_emails(text),
            "dates": self._extract_dates(text),
            "amounts": self._extract_amounts(text),
            "names": self._extract_names(text)
        }
        return entities

    def _extract_emails(self, text: str) -> List[str]:
        import re
        return re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)

    def _extract_dates(self, text: str) -> List[str]:
        import re
        return re.findall(r"\d{1,2}[/-]\d{1,2}[/-]\d{2,4}", text)

    def _extract_amounts(self, text: str) -> List[str]:
        import re
        return re.findall(r"\$\s?[\d,]+\.?\d*", text)

    def _extract_names(self, text: str) -> List[str]:
        # Simple capitalized word extraction (use NER model for production)
        import re
        words = re.findall(r"\b[A-Z][a-z]+\s+[A-Z][a-z]+\b", text)
        return list(set(words))[:20]

Step 4: Build the summarization and indexing stages

from typing import Optional

class Summarizer:
    """Generate document summaries using an LLM."""

    def __init__(self, llm_client):
        self.llm = llm_client

    def summarize(self, text: str, max_words: int = 200) -> str:
        prompt = f"Summarize the following document in {max_words} words or fewer:\n\n{text[:4000]}"
        response = self.llm.invoke(prompt)
        return response.content if hasattr(response, "content") else str(response)

class DocumentIndexer:
    """Index parsed content into a document store."""

    def __init__(self, vector_store, embedder):
        self.store = vector_store
        self.embedder = embedder

    def index(self, parsed_doc: Dict, entities: Dict, summary: str, doc_id: str):
        chunks = self._chunk_text(parsed_doc["full_text"])
        for i, chunk in enumerate(chunks):
            embedding = self.embedder.embed(chunk)
            self.store.add(
                id=f"{doc_id}_chunk_{i}",
                text=chunk,
                embedding=embedding,
                metadata={
                    "source": parsed_doc["source"],
                    "entities": entities,
                    "summary": summary,
                    "chunk_index": i
                }
            )
        return len(chunks)

    def _chunk_text(self, text: str, chunk_size: int = 500) -> List[str]:
        sentences = text.replace("\n", " ").split(". ")
        chunks, current = [], ""
        for sentence in sentences:
            if len(current) + len(sentence) > chunk_size:
                if current:
                    chunks.append(current.strip())
                current = sentence
            else:
                current += ". " + sentence if current else sentence
        if current:
            chunks.append(current.strip())
        return chunks

Step 5: Assemble and run the pipeline

class DocumentAnalysisPipeline:
    """End-to-end pipeline for document analysis."""

    def __init__(self, llm_client=None, vector_store=None, embedder=None):
        self.parser = DocumentParser()
        self.extractor = ExtractionEngine()
        self.summarizer = Summarizer(llm_client) if llm_client else None
        self.indexer = DocumentIndexer(vector_store, embedder) if vector_store else None

    def run(self, file_path: str, doc_id: str = None) -> Dict:
        try:
            parsed = self.parser.parse(file_path)
            entities = self.extractor.extract(parsed)
            summary = self.summarizer.summarize(parsed["full_text"]) if self.summarizer else ""
            chunk_count = self.indexer.index(parsed, entities, summary, doc_id) if self.indexer else 0

            return {
                "doc_id": doc_id,
                "source": file_path,
                "format": parsed["format"],
                "entities": entities,
                "summary": summary,
                "chunks_indexed": chunk_count
            }
        except Exception as e:
            print(f"Pipeline error: {e}")
            return {"error": str(e), "source": file_path}

# Example usage with mock clients
class MockLLM:
    def invoke(self, prompt):
        class R:
            content = "Mock summary: " + prompt[:50] + "..."
        return R()

class MockEmbedder:
    def embed(self, text):
        return [0.1] * 384

class MockVectorStore:
    def __init__(self):
        self.docs = []
    def add(self, id, text, embedding, metadata):
        self.docs.append({"id": id, "metadata": metadata})

pipeline = DocumentAnalysisPipeline(
    llm_client=MockLLM(),
    vector_store=MockVectorStore(),
    embedder=MockEmbedder()
)

result = pipeline.run("sample.pdf", doc_id="doc_001")
print(result)

Verification

Expected output:

{
    "doc_id": "doc_001",
    "source": "sample.pdf",
    "format": "pdf",
    "entities": {
        "email_addresses": [...],
        "dates": [...],
        "amounts": [...],
        "names": [...]
    },
    "summary": "Mock summary: ...",
    "chunks_indexed": 5
}

Verify that: result["format"] matches the file extension; result["entities"] contains all four entity types; result["summary"] is a non-empty string; result["chunks_indexed"] is a positive integer.

Common failures

  1. PDF text extraction returning empty strings. Scanned PDFs contain no text layers-only images. Use OCR (Tesseract via pytesseract) or a vision-based LLM API to extract content from scanned documents.

  2. Memory exhaustion with large documents. Loading a 500-page PDF entirely into memory before chunking causes OOM on standard instances. Stream the PDF page-by-page and write chunks to disk or a queue before indexing.

  3. Inconsistent chunk boundaries splitting entities. Splitting mid-sentence breaks semantic coherence and can split a phone number or address across two chunks. Use sentence-aware chunking with overlap (50-100 token overlap between chunks) to preserve entity boundaries.

  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • Build a Custom Prompt Template Library - Use structured prompt templates to standardize entity extraction instructions so the same extraction logic works across document types.
  • Implement Few-Shot Example Selection Dynamically - Inject domain-specific few-shot examples into the summarization prompt to improve accuracy on specialized document types like legal contracts or medical records.