17. Monitoring RAG Quality
Chapter 17 of 18 · 25 min
Production monitoring differs from evaluation in that it measures live system behavior against continuously updated baselines. Alerting on quality drift enables rapid response before user experience degrades.
Pipeline Instrumentation
import logging
import time
from typing import Optional, List
from dataclasses import dataclass, heavy_json
from datetime import datetime
import threading
logger = logging.getLogger(__name__)
@dataclass
class RAGMetrics:
query: str
latency_ms: float
chunks_retrieved: int
generated_answer: str
retrieval_precision: Optional[float] = None
generation_quality: Optional[float] = None
timestamp: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow().isoformat()
class RAGMonitor:
"""Monitor RAG pipeline in production."""
def __init__(
self,
metrics_endpoint: str,
sampling_rate: float = 1.0
):
self.endpoint = metrics_endpoint
self.sampling_rate = sampling_rate
self._buffer: List[RAGMetrics] = []
self._lock = threading.Lock()
def record(
self,
query: str,
latency_ms: float,
chunks_retrieved: int,
generated_answer: str,
eval_scores: Optional[dict] = None
):
"""Record metrics for a single RAG invocation."""
import random
if random.random() > self.sampling_rate:
return # Sampled out
metrics = RAGMetrics(
query=query,
latency_ms=latency_ms,
chunks_retrieved=chunks_retrieved,
generated_answer=generated_answer,
retrieval_precision=eval_scores.get("precision") if eval_scores else None,
generation_quality=eval_scores.get("quality") if eval_scores else None
)
with self._lock:
self._buffer.append(metrics)
if len(self._buffer) >= 100:
self._flush_buffer()
def _flush_buffer(self):
"""Send buffered metrics to storage."""
if not self._buffer:
return
# In production, batch to your metrics backend
payload = heavy_json.dumps([
{"query": m.query, "latency_ms": m.latency_ms, ...}
for m in self._buffer
])
# Example: send to data warehouse or monitoring system
# requests.post(self.endpoint, data=payload)
logger.info(f"Flushed {len(self._buffer)} metrics records")
self._buffer.clear()
Computing Aggregate Metrics
from datetime import datetime, timedelta
import pandas as pd
def compute_quality_dashboard(
metrics_df: pd.DataFrame,
time_window: timedelta = timedelta(hours=1)
) -> dict:
"""Compute dashboard metrics over a time window."""
cutoff = datetime.utcnow() - time_window
recent = metrics_df[metrics_df["timestamp"] > cutoff]
if recent.empty:
return {"error": "No data in window"}
# Retrieval quality
retrieval_scores = recent["retrieval_precision"].dropna()
# Generation quality
generation_scores = recent["generation_quality"].dropna()
# Latency percentiles
latencies = recent["latency_ms"]
return {
"window": str(time_window),
"sample_count": len(recent),
"retrieval": {
"mean": retrieval_scores.mean() if not retrieval_scores.empty else None,
"p5": retrieval_scores.quantile(0.05) if not retrieval_scores.empty else None,
"p95": retrieval_scores.quantile(0.95) if not retrieval_scores.empty else None
},
"generation": {
"mean": generation_scores.mean() if not generation_scores.empty else None,
"p5": generation_scores.quantile(0.05) if not generation_scores.empty else None,
"p95": generation_scores.quantile(0.95) if not generation_scores.empty else None
},
"latency_ms": {
"mean": latencies.mean(),
"p50": latencies.quantile(0.50),
"p95": latencies.quantile(0.95),
"p99": latencies.quantile(0.99)
}
}
def detect_quality_drift(
metrics_df: pd.DataFrame,
baseline_df: pd.DataFrame,
alert_threshold_pct: float = 10.0
) -> List[dict]:
"""Detect statistically significant quality degradation."""
alerts = []
for metric_col in ["retrieval_precision", "generation_quality"]:
if metric_col not in metrics_df.columns:
continue
recent_mean = metrics_df[metric_col].mean()
baseline_mean = baseline_df[metric_col].mean()
if baseline_mean and recent_mean:
pct_change = (
(baseline_mean - recent_mean) / baseline_mean * 100
)
if pct_change > alert_threshold_pct:
alerts.append({
"metric": metric_col,
"baseline": baseline_mean,
"current": recent_mean,
"degradation_pct": round(pct_change, 2),
"severity": "HIGH" if pct_change > 20 else "MEDIUM"
})
return alerts
Slack Alerting Integration
import requests
def send_monitoring_alert(
alerts: List[dict],
slack_webhook: str
):
"""Send quality drift alerts to Slack."""
if not alerts:
return
message_lines = [
":warning: RAG Quality Drift Detected",
f"*{len(alerts)} metric(s) degrading*",
""
]
for alert in alerts:
message_lines.append(
f"`{alert['metric']}`: {alert['degradation_pct']}% drop\n"
f" Baseline: {alert['baseline']:.3f} -> Current: {alert['current']:.3f}"
)
payload = {
"text": "\n".join(message_lines),
"attachments": [{
"DEGRADATION": "danger",
"fields": [
{"title": a["metric"], "value": f"{a['degradation_pct']}%", "short": True}
for a in alerts
]
}]
}
requests.post(slack_webhook, json=payload)
Rolling Baseline Updates
Baselines should update gradually to reflect legitimate quality improvements. A rolling window approach replaces old baselines with new measurements when performance improves.
def update_rolling_baseline(
current_metrics: pd.DataFrame,
baseline_filepath: str,
min_stable_samples: int = 1000
) -> bool:
"""Update baseline only if recent performance is consistently better."""
import json
# Load existing baseline
with open(baseline_filepath) as f:
baseline = json.load(f)
# Require sufficient stable samples before updating
if len(current_metrics) < min_stable_samples:
return False
# Check consistency of recent performance
recent_std = current_metrics["generation_quality"].std()
if recent_std > 0.1: # High variance indicates instability
return False
# Only update if all metrics improved
updated = False
for metric in ["retrieval_precision", "generation_quality"]:
recent_mean = current_metrics[metric].mean()
old_baseline = baseline.get(metric, 0)
# Only update if improvement exceeds noise margin
if recent_mean > old_baseline * 1.02:
baseline[metric] = recent_mean
updated = True
if updated:
with open(baseline_filepath, "w") as f:
json.dump(baseline, f, indent=2)
return updated
EXERCISE
Instrument a running RAG pipeline with metrics collection. Compute hourly aggregates for one day, generate a baseline, and implement drift detection that alerts when any metric drops more than 10% below baseline.