18. RAG Evaluation Dashboard Project
This chapter consolidates the evaluation techniques from previous chapters into a complete monitoring dashboard. The dashboard visualizes retrieval quality, generation quality, and latency across deployment versions, enabling quick diagnosis of issues.
Project Structure
rag_eval_dashboard/
├── app.py # Streamlit application
├── pipeline/
│ ├── __init__.py
│ ├── retrieval.py # Retrieval component
│ ├── generation.py # Generation component
│ └── metrics.py # Metrics instrumentation
├── evaluation/
│ ├── __init__.py
│ ├── ragas_eval.py # RAGAS integration
│ ├── synthetic.py # Synthetic data generation
│ └── annotation.py # Human annotation tools
├── monitoring/
│ ├── __init__.py
│ ├── collector.py # Metrics collector
│ ├── drift_detector.py # Drift detection
│ └── alerting.py # Alert integration
├── data/
│ ├── test_set.jsonl # Evaluation test set
│ ├── baselines/ # Saved baselines
│ └── production_logs/ # Live metrics storage
├── scripts/
│ ├── run_evaluation.py # CLI evaluation runner
│ └── export_baseline.py # Baseline export utility
└── tests/
├── test_retrieval.py
├── test_generation.py
└── test_integration.py
Streamlit Dashboard Application
# app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
import json
st.set_page_config(
page_title="RAG Evaluation Dashboard",
layout="wide"
)
st.title("RAG Quality Monitoring Dashboard")
# Sidebar controls
st.sidebar.header("Configuration")
time_window = st.sidebar.selectbox(
"Time Window",
["Last Hour", "Last 24 Hours", "Last 7 Days", "Last 30 Days"]
)
window_map = {
"Last Hour": timedelta(hours=1),
"Last 24 Hours": timedelta(hours=24),
"Last 7 Days": timedelta(days=7),
"Last 30 Days": timedelta(days=30)
}
# Load data
@st.cache_data
def load_metrics_data(window: timedelta):
# In production, query from data warehouse
# This example uses a local CSV for demonstration
import pandas as pd
try:
df = pd.read_csv("data/metrics_log.csv")
df["timestamp"] = pd.to_datetime(df["timestamp"])
cutoff = datetime.utcnow() - window
return df[df["timestamp"] > cutoff]
except FileNotFoundError:
return pd.DataFrame()
metrics_df = load_metrics_data(window_map[time_window])
if metrics_df.empty:
st.warning("No data available for the selected time window")
st.stop()
# Top-level metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
avg_precision = metrics_df["retrieval_precision"].mean()
st.metric(
"Context Precision",
f"{avg_precision:.3f}" if pd.notna(avg_precision) else "N/A",
delta=None
)
with col2:
avg_relevancy = metrics_df["generation_relevancy"].mean()
st.metric(
"Answer Relevancy",
f"{avg_relevancy:.3f}" if pd.notna(avg_relevancy) else "N/A"
)
with col3:
avg_faithfulness = metrics_df["faithfulness"].mean()
st.metric(
"Faithfulness",
f"{avg_faithfulness:.3f}" if pd.notna(avg_faithfulness) else "N/A"
)
with col4:
avg_latency = metrics_df["latency_ms"].mean()
p95_latency = metrics_df["latency_ms"].quantile(0.95)
st.metric(
"P95 Latency",
f"{p95_latency:.0f}ms",
delta=avg_latency - 1000 # Compare to 1s target
)
# Detailed charts
st.header("Quality Metrics Over Time")
tab1, tab2, tab3 = st.tabs([
"Retrieval Quality",
"Generation Quality",
"Latency"
])
with tab1:
fig = px.line(
metrics_df,
x="timestamp",
y="retrieval_precision",
title="Context Precision Over Time"
)
fig.add_hline(
y=0.80,
line_dash="dash",
annotation_text="Target",
line_color="red"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
with tab2:
fig = px.line(
metrics_df,
x="timestamp",
y=["generation_relevancy", "faithfulness"],
title="Generation Quality Metrics"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
with tab3:
fig = px.histogram(
metrics_df,
x="latency_ms",
nbins=50,
title="Latency Distribution"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Version comparison
st.header("Performance by Deployment Version")
if "version" in metrics_df.columns:
version_metrics = metrics_df.groupby("version").agg({
"retrieval_precision": "mean",
"generation_relevancy": "mean",
"faithfulness": "mean",
"latency_ms": ["mean", "count"]
}).round(3)
st.dataframe(version_metrics)
# Drift alerts
st.header("Drift Alerts")
try:
with open("data/baselines/latest.json") as f:
baseline = json.load(f)
drift_alerts = []
for metric, baseline_val in baseline.items():
if metric in metrics_df.columns:
current = metrics_df[metric].mean()
if baseline_val > 0 and current < baseline_val * 0.9:
drift_alerts.append({
"metric": metric,
"baseline": baseline_val,
"current": current,
"degradation_pct": round(
(baseline_val - current) / baseline_val * 100, 2
)
})
if drift_alerts:
st.error(f"{len(drift_alerts)} quality degradation detected")
st.dataframe(pd.DataFrame(drift_alerts))
else:
st.success("No significant drift detected")
except FileNotFoundError:
st.info("No baseline data available. Run evaluation first to establish baselines.")
# Query examples
st.header("Recent Query Examples")
sample_queries = metrics_df.nlargest(10, "latency_ms")[
["timestamp", "query", "latency_ms", "retrieval_precision", "faithfulness"]
]
st.dataframe(sample_queries, use_container_width=True)
Evaluation Runner Script
# scripts/run_evaluation.py
import argparse
import json
import logging
from pathlib import Path
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall
)
from datasets import Dataset
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description="Run RAG evaluation")
parser.add_argument("--dataset", required=True, help="Test dataset path")
parser.add_argument("--output", default="results.json", help="Output file")
parser.add_argument("--model", default="gpt-4o", help="Judge model")
args = parser.parse_args()
# Load test dataset
logger.info(f"Loading dataset from {args.dataset}")
with open(args.dataset) as f:
examples = [json.loads(line) for line in f]
# Convert to HuggingFace dataset format
eval_dataset = Dataset.from_list(examples)
# Run evaluation
logger.info("Running RAGAS evaluation")
result = evaluate(
eval_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall
],
raise_exceptions=False
)
# Output results
results_dict = {
"metrics": {k: v for k, v in result.items()},
"mean_scores": {k: float(v) for k, v in result.items()},
"timestamp": str(Path(args.dataset).stat().st_mtime)
}
with open(args.output, "w") as f:
json.dump(results_dict, f, indent=2)
logger.info(f"Results written to {args.output}")
# Print summary
print("\n=== Evaluation Results ===")
for metric, score in result.items():
print(f"{metric}: {score:.3f}")
if __name__ == "__main__":
main()
Running the Complete Project
# Install dependencies
pip install streamlit plotly ragas datasets pandas scikit-learn langchain-openai
# Generate test dataset
python -c "
from evaluation.synthetic import generate_test_dataset
dataset = generate_test_dataset(
corpus_path='data/corpus.json',
count=200,
domain='technical_documentation'
)
with open('data/test_set.jsonl', 'w') as f:
for item in dataset:
f.write(json.dumps(item) + '\n')
"
# Run evaluation
python scripts/run_evaluation.py \
--dataset data/test_set.jsonl \
--output results.json
# Establish baseline
python scripts/export_baseline.py \
--results results.json \
--name latest \
--commit "$(git rev-parse HEAD)"
# Start dashboard
streamlit run app.py --server.port 8501
Exercise Deliverables
Build a complete evaluation pipeline that produces output matching this rubric:
| Component | Requirement |
|---|---|
| Test Dataset | 100 diverse queries with ground truth answers |
| Evaluation | RAGAS metrics plus custom hallucination detection |
| Dashboard | 3+ metric visualizations over time with baselines |
| Alerts | Drift detection with 10% threshold with Slack integration |
| CI/CD | Automated pipeline with threshold enforcement on PR |
The dashboard should surface at minimum: context precision trend, answer faithfulness distribution, and latency histogram with percentile markers. All metrics should be queryable for arbitrary time windows.
Complete the full dashboard implementation with all components listed in the project structure. Generate 100 synthetic test queries for your domain, run the evaluation, establish baselines, and verify the dashboard displays realistic data patterns.