RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /RAG Evaluation and Metrics
  6. /Ch. 17
RAG Evaluation and Metrics

17. Monitoring RAG Quality

Chapter 17 of 18 · 25 min
KEY INSIGHT

Production monitoring requires intentional sampling to balance observability with cost. Evaluating every query with LLM-based metrics is prohibitively expensive; sampling representative queries provides sufficient signal at sustainable cost.

Production monitoring differs from evaluation in that it measures live system behavior against continuously updated baselines. Alerting on quality drift enables rapid response before user experience degrades.

Pipeline Instrumentation

import logging
import time
from typing import Optional, List
from dataclasses import dataclass, heavy_json
from datetime import datetime
import threading

logger = logging.getLogger(__name__)

@dataclass
class RAGMetrics:
    query: str
    latency_ms: float
    chunks_retrieved: int
    generated_answer: str
    retrieval_precision: Optional[float] = None
    generation_quality: Optional[float] = None
    timestamp: Optional[str] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.utcnow().isoformat()

class RAGMonitor:
    """Monitor RAG pipeline in production."""
    
    def __init__(
        self,
        metrics_endpoint: str,
        sampling_rate: float = 1.0
    ):
        self.endpoint = metrics_endpoint
        self.sampling_rate = sampling_rate
        self._buffer: List[RAGMetrics] = []
        self._lock = threading.Lock()
    
    def record(
        self,
        query: str,
        latency_ms: float,
        chunks_retrieved: int,
        generated_answer: str,
        eval_scores: Optional[dict] = None
    ):
        """Record metrics for a single RAG invocation."""
        import random
        if random.random() > self.sampling_rate:
            return  # Sampled out
        
        metrics = RAGMetrics(
            query=query,
            latency_ms=latency_ms,
            chunks_retrieved=chunks_retrieved,
            generated_answer=generated_answer,
            retrieval_precision=eval_scores.get("precision") if eval_scores else None,
            generation_quality=eval_scores.get("quality") if eval_scores else None
        )
        
        with self._lock:
            self._buffer.append(metrics)
            
            if len(self._buffer) >= 100:
                self._flush_buffer()
    
    def _flush_buffer(self):
        """Send buffered metrics to storage."""
        if not self._buffer:
            return
        
        # In production, batch to your metrics backend
        payload = heavy_json.dumps([
            {"query": m.query, "latency_ms": m.latency_ms, ...}
            for m in self._buffer
        ])
        
        # Example: send to data warehouse or monitoring system
        # requests.post(self.endpoint, data=payload)
        
        logger.info(f"Flushed {len(self._buffer)} metrics records")
        self._buffer.clear()

Computing Aggregate Metrics

from datetime import datetime, timedelta
import pandas as pd

def compute_quality_dashboard(
    metrics_df: pd.DataFrame,
    time_window: timedelta = timedelta(hours=1)
) -> dict:
    """Compute dashboard metrics over a time window."""
    
    cutoff = datetime.utcnow() - time_window
    recent = metrics_df[metrics_df["timestamp"] > cutoff]
    
    if recent.empty:
        return {"error": "No data in window"}
    
    # Retrieval quality
    retrieval_scores = recent["retrieval_precision"].dropna()
    
    # Generation quality
    generation_scores = recent["generation_quality"].dropna()
    
    # Latency percentiles
    latencies = recent["latency_ms"]
    
    return {
        "window": str(time_window),
        "sample_count": len(recent),
        
        "retrieval": {
            "mean": retrieval_scores.mean() if not retrieval_scores.empty else None,
            "p5": retrieval_scores.quantile(0.05) if not retrieval_scores.empty else None,
            "p95": retrieval_scores.quantile(0.95) if not retrieval_scores.empty else None
        },
        
        "generation": {
            "mean": generation_scores.mean() if not generation_scores.empty else None,
            "p5": generation_scores.quantile(0.05) if not generation_scores.empty else None,
            "p95": generation_scores.quantile(0.95) if not generation_scores.empty else None
        },
        
        "latency_ms": {
            "mean": latencies.mean(),
            "p50": latencies.quantile(0.50),
            "p95": latencies.quantile(0.95),
            "p99": latencies.quantile(0.99)
        }
    }

def detect_quality_drift(
    metrics_df: pd.DataFrame,
    baseline_df: pd.DataFrame,
    alert_threshold_pct: float = 10.0
) -> List[dict]:
    """Detect statistically significant quality degradation."""
    alerts = []
    
    for metric_col in ["retrieval_precision", "generation_quality"]:
        if metric_col not in metrics_df.columns:
            continue
        
        recent_mean = metrics_df[metric_col].mean()
        baseline_mean = baseline_df[metric_col].mean()
        
        if baseline_mean and recent_mean:
            pct_change = (
                (baseline_mean - recent_mean) / baseline_mean * 100
            )
            
            if pct_change > alert_threshold_pct:
                alerts.append({
                    "metric": metric_col,
                    "baseline": baseline_mean,
                    "current": recent_mean,
                    "degradation_pct": round(pct_change, 2),
                    "severity": "HIGH" if pct_change > 20 else "MEDIUM"
                })
    
    return alerts

Slack Alerting Integration

import requests

def send_monitoring_alert(
    alerts: List[dict],
    slack_webhook: str
):
    """Send quality drift alerts to Slack."""
    
    if not alerts:
        return
    
    message_lines = [
        ":warning: RAG Quality Drift Detected",
        f"*{len(alerts)} metric(s) degrading*",
        ""
    ]
    
    for alert in alerts:
        message_lines.append(
            f"`{alert['metric']}`: {alert['degradation_pct']}% drop\n"
            f"   Baseline: {alert['baseline']:.3f} -> Current: {alert['current']:.3f}"
        )
    
    payload = {
        "text": "\n".join(message_lines),
        "attachments": [{
            "DEGRADATION": "danger",
            "fields": [
                {"title": a["metric"], "value": f"{a['degradation_pct']}%", "short": True}
                for a in alerts
            ]
        }]
    }
    
    requests.post(slack_webhook, json=payload)

Rolling Baseline Updates

Baselines should update gradually to reflect legitimate quality improvements. A rolling window approach replaces old baselines with new measurements when performance improves.

def update_rolling_baseline(
    current_metrics: pd.DataFrame,
    baseline_filepath: str,
    min_stable_samples: int = 1000
) -> bool:
    """Update baseline only if recent performance is consistently better."""
    import json
    
    # Load existing baseline
    with open(baseline_filepath) as f:
        baseline = json.load(f)
    
    # Require sufficient stable samples before updating
    if len(current_metrics) < min_stable_samples:
        return False
    
    # Check consistency of recent performance
    recent_std = current_metrics["generation_quality"].std()
    if recent_std > 0.1:  # High variance indicates instability
        return False
    
    # Only update if all metrics improved
    updated = False
    for metric in ["retrieval_precision", "generation_quality"]:
        recent_mean = current_metrics[metric].mean()
        old_baseline = baseline.get(metric, 0)
        
        # Only update if improvement exceeds noise margin
        if recent_mean > old_baseline * 1.02:
            baseline[metric] = recent_mean
            updated = True
    
    if updated:
        with open(baseline_filepath, "w") as f:
            json.dump(baseline, f, indent=2)
    
    return updated
EXERCISE

Instrument a running RAG pipeline with metrics collection. Compute hourly aggregates for one day, generate a baseline, and implement drift detection that alerts when any metric drops more than 10% below baseline.

← Chapter 16
Regression Testing
Chapter 18 →
RAG Evaluation Dashboard Project