16. Regression Testing
Regression testing in RAG systems must account for the non-deterministic nature of LLM generation. The same input can produce different outputs across runs, which complicates traditional unit test assertions.
Handling Non-Deterministic Outputs
Use semantic similarity instead of exact string matching. Sample sufficient outputs to estimate probability distributions rather than asserting on individual runs.
import pytest
from ragas.metrics import answer_similarity
from semantic_text_similarity import SemanticComparer
from typing import List
class TestRAGRegression:
"""Regression tests accounting for generation variability."""
def test_answer_semantic_consistency(
self,
rag_pipeline,
regression_queries: List[str]
):
"""Answers should stay semantically consistent across code changes."""
comparer = SemanticComparer()
consistency_scores = []
for query in regression_queries:
# Generate multiple times accounting for temperature
outputs = [
rag_pipeline(query, generation_config={"temperature": 0.0})
for _ in range(3)
]
# Check pairwise similarity
pairwise_scores = [
comparer.compute(outputs[i], outputs[j])
for i in range(len(outputs))
for j in range(i+1, len(outputs))
]
avg_consistency = sum(pairwise_scores) / len(pairwise_scores)
consistency_scores.append(avg_consistency)
assert avg_consistency >= 0.85, \
f"Query '{query[:50]}...' shows inconsistent generation"
overall = sum(consistency_scores) / len(consistency_scores)
print(f"Consistency: {overall:.3f}")
def test_retrieval_stability(
self,
retrieval_pipeline,
stability_test_queries: List[str]
):
"""Retrieval should produce stable results across identical inputs."""
stability_scores = []
for query in stability_test_queries:
run1 = retrieval_pipeline(query, top_k=5)
run2 = retrieval_pipeline(query, top_k=5)
# Calculate Jaccard similarity of retrieved doc IDs
ids1 = {doc["id"] for doc in run1}
ids2 = {doc["id"] for doc in run2}
jaccard = len(ids1 & ids2) / len(ids1union(ids2)) if ids1_ids2 else 1.0
stability_scores.append(jaccard)
avg_stability = sum(stability_scores) / len(stability_scores)
assert avg_stability >= 0.95, \
f"Retrieval stability {avg_stability:.2f} below 0.95 threshold"
def test_no_harmful_regressions(
self,
rag_pipeline,
critical_queries: List[str],
baseline_metrics: dict
):
"""Critical query performance should not degrade."""
from ragas.metrics import faithfulness, answer_relevancy
from ragas import evaluate
from datasets import Dataset
eval_data = {
"user_input": critical_queries,
"retrieved_contexts": [], # Populated by pipeline
"response": [], # Generated by pipeline
"reference": []
}
dataset = Dataset.from_dict(eval_data)
scores = evaluate(
dataset,
metrics=[faithfulness, answer_relevancy],
raise_exceptions=False
)
for metric, baseline in baseline_metrics.items():
current = scores[metric].mean()
assert current >= baseline * 0.95, \
f"{metric} regressed: {current:.3f} vs baseline {baseline:.3f}"
### Baseline Management
```python
import json
from pathlib import Path
from datetime import datetime
class BaselineManager:
"""Manage regression baselines across versions."""
def __init__(self, baseline_dir: str = "baselines"):
self.baseline_dir = Path(baseline_dir)
self.baseline_dir.mkdir(exist_ok=True)
def save_baseline(
self,
name: str,
metrics: dict,
commit_hash: str
):
"""Save current metrics as a named baseline."""
baseline = {
**metrics,
"commit": commit_hash,
"timestamp": datetime.utcnow().isoformat()
}
filepath = self.baseline_dir / f"{name}.json"
with open(filepath, "w") as f:
json.dump(baseline, f, indent=2)
print(f"Baseline '{name}' saved from {commit_hash}")
def load_baseline(self, name: str) -> dict:
"""Load a named baseline."""
filepath = self.baseline_dir / f"{name}.json"
with open(filepath) as f:
return json.load(f)
def compare_baseline(
self,
name: str,
current_metrics: dict
) -> dict:
"""Compare current metrics against a saved baseline."""
baseline = self.load_baseline(name)
comparison = {}
for metric, current in current_metrics.items():
if metric in baseline:
baseline_val = baseline[metric]
change = (current - baseline_val) / baseline_val
comparison[metric] = {
"current": current,
"baseline": baseline_val,
"change_pct": round(change * 100, 2)
}
return comparison
Critical Query Identification
Not all queries carry equal weight for regression testing. Identify critical queries based on query frequency, business impact, and historical failure patterns.
def identify_critical_queries(
query_logs: List[dict],
failure_history: List[dict],
top_n: int = 100
) -> List[str]:
"""Identify queries that should always pass regression tests."""
from collections import Counter
# High-volume queries
frequency = Counter(item["query"] for item in query_logs)
high_freq = {q for q, _ in frequency.most_common(top_n // 2)}
# Historically failing queries
failed_queries = {item["query"] for item in failure_history}
# Business-critical keywords
critical_keywords = {
"pricing", "limit", "policy", "refund", "cancel",
"downgrade", "error", "security", "permission"
}
critical = set()
for query in high_freq:
query_lower = query.lower()
if any(kw in query_lower for kw in critical_keywords):
critical.add(query)
# Combine priority sources
priority_queries = (high_freq | failed_queries | critical)[:top_n]
return list(priority_queries)
Instrument your RAG pipeline to log query frequency and failure occurrences for one week. Use this data to build a focused regression test suite of 50 critical queries that reflects real production usage patterns.