06. Text Classification Pipelines
Production text classification systems require more than model inference—they demand dependable pipelines handling input validation, preprocessing, prediction formatting, and confidence scoring. Building maintainable pipelines supports iterative improvement and systematic evaluation.
Pipeline architecture for local LLM classification typically includes input sanitization, batch processing, caching layers, and result aggregation. Input validation prevents model exploitation through adversarial prompts or malformed inputs. Batch processing amortizes inference overhead across multiple documents, improving throughput for bulk classification tasks.
from dataclasses import dataclass
from typing import List, Optional
import hashlib
import json
@dataclass
class ClassificationResult:
text_id: str
predicted_labels: List[str]
confidence_scores: dict
processing_time_ms: float
class TextClassificationPipeline:
def __init__(self, model_name: str, labels: List[str], cache_enabled: bool = True):
self.model_name = model_name
self.labels = labels
self.cache_enabled = cache_enabled
self.cache = {}
def _normalize_input(self, text: str) -> str:
text = ' '.join(text.split())
return text[:10000] # Enforce maximum length
def _compute_hash(self, text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()
def classify(self, text: str, text_id: Optional[str] = None) -> ClassificationResult:
normalized = self._normalize_input(text)
cache_key = self._compute_hash(normalized)
if self.cache_enabled and cache_key in self.cache:
result = self.cache[cache_key]
result.text_id = text_id or result.text_id
return result
prompt = self._build_classificationPrompt(normalized)
raw_response = self._inference(prompt)
parsed = self._parse_response(raw_response)
result = ClassificationResult(
text_id=text_id or cache_key,
predicted_labels=parsed['labels'],
confidence_scores=parsed['scores'],
processing_time_ms=parsed['latency']
)
if self.cache_enabled:
self.cache[cache_key] = result
return result
def _build_classification_prompt(self, text: str) -> str:
label_str = ', '.join(self.labels)
return f"""Classify the following text into one or more categories.
Categories: {label_str}
Return as JSON with 'labels' (array) and 'scores' (object).
Text: {text}"""
Error handling within pipelines prevents cascading failures. Timeout configurations, retry logic with exponential backoff, and fallback responses maintain service availability when models encounter problematic inputs. Circuit breakers pattern limits resource consumption during model degradation.
Monitoring classification pipeline health requires tracking latency distributions, label distribution shifts, and prediction confidence variance. Sudden changes in predicted label distributions drift detection—potential indicators of model degradation or adversarial inputs.
Design and implement a text classification pipeline with input validation, result caching, timeout handling, and metrics collection. Load test to identify bottlenecks.