19. Monitoring RAG Quality
Chapter 19 of 22 · 20 min
Production RAG needs continuous monitoring. Retrieval quality directly impacts answer quality, so track retrieval metrics.
Core Retrieval Metrics
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class RetrievalMetrics:
query: str
expected_docs: List[str] # Ground truth relevant docs
retrieved_docs: List[str]
latencies: dict # Stage -> seconds
@property
def precision_at_k(self, k: int = 10) -> float:
"""Precision at K retrieved docs."""
hits = sum(1 for doc in self.retrieved_docs[:k] if doc in self.expected_docs)
return hits / k
@property
def recall_at_k(self, k: int = 10) -> float:
"""Recall at K retrieved docs."""
if not self.expected_docs:
return 0.0
hits = sum(1 for doc in self.retrieved_docs[:k] if doc in self.expected_docs)
return hits / len(self.expected_docs)
@property
def mrr(self) -> float:
"""Mean reciprocal rank of first relevant doc."""
for i, doc in enumerate(self.retrieved_docs, 1):
if doc in self.expected_docs:
return 1.0 / i
return 0.0
@property
def ndcg(self) -> float:
"""Normalized discounted cumulative gain."""
dcg = 0.0
for i, doc in enumerate(self.retrieved_docs, 1):
if doc in self.expected_docs:
# Assume binary relevance
dcg += 1.0 / np.log2(i + 1)
# Ideal DCG: sort relevant docs first
ideal_order = [d for d in self.expected_docs]
idcg = sum(1.0 / np.log2(i + 1) for i in range(1, len(ideal_order) + 1))
return dcg / idcg if idcg > 0 else 0.0
Automated Quality Checks
class QualityMonitor:
def __init__(self, threshold_precision: float = 0.6,
threshold_recall: float = 0.5):
self.thresholds = {"precision": threshold_precision,
"recall": threshold_recall}
self.recent_metrics = []
def record(self, metrics: RetrievalMetrics):
self.recent_metrics.append(metrics)
# Keep last 1000 metrics
if len(self.recent_metrics) > 1000:
self.recent_metrics = self.recent_metrics[-1000:]
def check_quality(self) -> dict:
"""Check recent quality against thresholds."""
recent = self.recent_metrics[-100:] # Last 100 queries
avg_precision = np.mean([m.precision_at_k for m in recent])
avg_recall = np.mean([m.recall_at_k for m in recent])
avg_mrr = np.mean([m.mrr for m in recent])
alerts = []
if avg_precision < self.thresholds["precision"]:
alerts.append(f"Low precision: {avg_precision:.2f}")
if avg_recall < self.thresholds["recall"]:
alerts.append(f"Low recall: {avg_recall:.2f}")
return {
"avg_precision": avg_precision,
"avg_recall": avg_recall,
"avg_mrr": avg_mrr,
"alerts": alerts,
"status": "degraded" if alerts else "healthy"
}
def get_failed_queries(self) -> List[dict]:
"""Get queries that failed quality checks."""
failed = []
for m in self.recent_metrics:
if m.precision_at_k < self.thresholds["precision"]:
failed.append({
"query": m.query,
"expected": m.expected_docs,
"retrieved": m.retrieved_docs,
"precision": m.precision_at_k
})
return failed
Latency Monitoring
import prometheus_client as prom
class LatencyMonitor:
def __init__(self):
self.retrieval_histogram = prom.Histogram(
'retrieval_latency_seconds',
'Retrieval stage latency',
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
self.total_histogram = prom.Histogram(
'total_latency_seconds',
'Total pipeline latency',
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0]
)
self.p99_gauge = prom.Gauge('retrieval_p99_latency')
def record(self, stage_latencies: dict, total_latency: float):
if "embedding" in stage_latencies:
self.retrieval_histogram.observe(stage_latencies["embedding"])
self.total_histogram.observe(total_latency)
# Calculate P99
# (simplified - real implementation would use proper percentile)
self.p99_gauge.set(np.percentile(
self.retrieval_histogram, 99))
EXERCISE
Implement a quality monitoring system that records retrieval metrics for every query, alerts when precision drops below 0.5, and logs failed queries for later review.