How to build a document analysis agent pipeline
Document processing libraries, LLM, document store
What this does
A document analysis pipeline ingests raw files, extracts structured data, generates summaries, and indexes content for retrieval. This guide builds a production-ready pipeline handling PDF, DOCX, and plain text using modular stages.
Steps
Step 1: Install dependencies
pip install pymupdf python-docx langchain-community pypdf rapidfuzz
pymupdf handles PDF parsing with layout preservation. python-docx parses Word documents. langchain-community provides document splitters and embedding integration. rapidfuzz enables fuzzy entity matching.
Step 2: Build the document parser stage
import fitz # PyMuPDF
from docx import Document
from pathlib import Path
from typing import Dict, List
class DocumentParser:
"""Parse multiple file formats into raw text."""
def parse(self, file_path: str) -> Dict:
path = Path(file_path)
ext = path.suffix.lower()
if ext == ".pdf":
return self._parse_pdf(file_path)
elif ext == ".docx":
return self._parse_docx(file_path)
elif ext == ".txt":
return self._parse_txt(file_path)
else:
raise ValueError(f"Unsupported format: {ext}")
def _parse_pdf(self, path: str) -> Dict:
doc = fitz.open(path)
pages = []
for page in doc:
text = page.get_text()
pages.append(text)
return {
"source": path,
"format": "pdf",
"pages": pages,
"total_pages": len(pages),
"full_text": "\n\n".join(pages)
}
def _parse_docx(self, path: str) -> Dict:
doc = Document(path)
paragraphs = [p.text for p in doc.paragraphs]
return {
"source": path,
"format": "docx",
"paragraphs": paragraphs,
"full_text": "\n".join(paragraphs)
}
def _parse_txt(self, path: str) -> Dict:
with open(path, "r", encoding="utf-8") as f:
text = f.read()
return {
"source": path,
"format": "txt",
"full_text": text
}
Step 3: Build the extraction engine
from rapidfuzz import fuzz
from datetime import datetime
class ExtractionEngine:
"""Extract structured data from parsed document text."""
def extract(self, parsed_doc: Dict) -> Dict:
text = parsed_doc["full_text"]
entities = {
"email_addresses": self._extract_emails(text),
"dates": self._extract_dates(text),
"amounts": self._extract_amounts(text),
"names": self._extract_names(text)
}
return entities
def _extract_emails(self, text: str) -> List[str]:
import re
return re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
def _extract_dates(self, text: str) -> List[str]:
import re
return re.findall(r"\d{1,2}[/-]\d{1,2}[/-]\d{2,4}", text)
def _extract_amounts(self, text: str) -> List[str]:
import re
return re.findall(r"\$\s?[\d,]+\.?\d*", text)
def _extract_names(self, text: str) -> List[str]:
# Simple capitalized word extraction (use NER model for production)
import re
words = re.findall(r"\b[A-Z][a-z]+\s+[A-Z][a-z]+\b", text)
return list(set(words))[:20]
Step 4: Build the summarization and indexing stages
from typing import Optional
class Summarizer:
"""Generate document summaries using an LLM."""
def __init__(self, llm_client):
self.llm = llm_client
def summarize(self, text: str, max_words: int = 200) -> str:
prompt = f"Summarize the following document in {max_words} words or fewer:\n\n{text[:4000]}"
response = self.llm.invoke(prompt)
return response.content if hasattr(response, "content") else str(response)
class DocumentIndexer:
"""Index parsed content into a document store."""
def __init__(self, vector_store, embedder):
self.store = vector_store
self.embedder = embedder
def index(self, parsed_doc: Dict, entities: Dict, summary: str, doc_id: str):
chunks = self._chunk_text(parsed_doc["full_text"])
for i, chunk in enumerate(chunks):
embedding = self.embedder.embed(chunk)
self.store.add(
id=f"{doc_id}_chunk_{i}",
text=chunk,
embedding=embedding,
metadata={
"source": parsed_doc["source"],
"entities": entities,
"summary": summary,
"chunk_index": i
}
)
return len(chunks)
def _chunk_text(self, text: str, chunk_size: int = 500) -> List[str]:
sentences = text.replace("\n", " ").split(". ")
chunks, current = [], ""
for sentence in sentences:
if len(current) + len(sentence) > chunk_size:
if current:
chunks.append(current.strip())
current = sentence
else:
current += ". " + sentence if current else sentence
if current:
chunks.append(current.strip())
return chunks
Step 5: Assemble and run the pipeline
class DocumentAnalysisPipeline:
"""End-to-end pipeline for document analysis."""
def __init__(self, llm_client=None, vector_store=None, embedder=None):
self.parser = DocumentParser()
self.extractor = ExtractionEngine()
self.summarizer = Summarizer(llm_client) if llm_client else None
self.indexer = DocumentIndexer(vector_store, embedder) if vector_store else None
def run(self, file_path: str, doc_id: str = None) -> Dict:
try:
parsed = self.parser.parse(file_path)
entities = self.extractor.extract(parsed)
summary = self.summarizer.summarize(parsed["full_text"]) if self.summarizer else ""
chunk_count = self.indexer.index(parsed, entities, summary, doc_id) if self.indexer else 0
return {
"doc_id": doc_id,
"source": file_path,
"format": parsed["format"],
"entities": entities,
"summary": summary,
"chunks_indexed": chunk_count
}
except Exception as e:
print(f"Pipeline error: {e}")
return {"error": str(e), "source": file_path}
# Example usage with mock clients
class MockLLM:
def invoke(self, prompt):
class R:
content = "Mock summary: " + prompt[:50] + "..."
return R()
class MockEmbedder:
def embed(self, text):
return [0.1] * 384
class MockVectorStore:
def __init__(self):
self.docs = []
def add(self, id, text, embedding, metadata):
self.docs.append({"id": id, "metadata": metadata})
pipeline = DocumentAnalysisPipeline(
llm_client=MockLLM(),
vector_store=MockVectorStore(),
embedder=MockEmbedder()
)
result = pipeline.run("sample.pdf", doc_id="doc_001")
print(result)
Verification
Expected output:
{
"doc_id": "doc_001",
"source": "sample.pdf",
"format": "pdf",
"entities": {
"email_addresses": [...],
"dates": [...],
"amounts": [...],
"names": [...]
},
"summary": "Mock summary: ...",
"chunks_indexed": 5
}
Verify that: result["format"] matches the file extension; result["entities"] contains all four entity types; result["summary"] is a non-empty string; result["chunks_indexed"] is a positive integer.
Common failures
PDF text extraction returning empty strings. Scanned PDFs contain no text layers-only images. Use OCR (Tesseract via
pytesseract) or a vision-based LLM API to extract content from scanned documents.Memory exhaustion with large documents. Loading a 500-page PDF entirely into memory before chunking causes OOM on standard instances. Stream the PDF page-by-page and write chunks to disk or a queue before indexing.
Inconsistent chunk boundaries splitting entities. Splitting mid-sentence breaks semantic coherence and can split a phone number or address across two chunks. Use sentence-aware chunking with overlap (50-100 token overlap between chunks) to preserve entity boundaries.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.
Related guides
- Build a Custom Prompt Template Library - Use structured prompt templates to standardize entity extraction instructions so the same extraction logic works across document types.
- Implement Few-Shot Example Selection Dynamically - Inject domain-specific few-shot examples into the summarization prompt to improve accuracy on specialized document types like legal contracts or medical records.