KEY INSIGHT
This capstone project integrates previous chapters into a production-ready MLOps pipeline. You'll implement drift detection, CI/CD, GitOps, monitoring, governance, and audit trails—a complete, operational MLOps system for local AI.
### Project Overview
Build a complete MLOps pipeline for a sentiment analysis model serving local inference requests. The system will:
1. **Detect data and model drift** in real-time during serving
2. **Automate training and evaluation** through pipeline as code
3. **Manage deployments** using GitOps principles
4. **Monitor performance** with alerts and dashboards
5. **Govern responsibly** with fairness evaluation and audit trails
6. **Retain accountability** through detailed logging
### Architecture
```
┌─────────────────────────────┐
│ Git Repository │
│ (Pipeline as Code) │
│ (Model Configurations) │
│ (Deployment Manifests) │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ CI/CD Pipeline │
│ (GitHub Actions/GitLab CI) │
│ - Data Validation │
│ - Model Training │
│ - Evaluation │
│ - Artifact Registration │
└──────────────┬──────────────┘
│
┌──────────────────────────┼──────────────────────────┐
│ │ │
┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ Production │ │ Staging │ │ Development │
│ (Edge Nodes) │ │ (Staging) │ │ (Experiment) │
└────────┬────────┘ └─────────────────┘ └─────────────────┘
│
┌────────▼────────┐
│ Monitoring │
│ Stack │
│ - Prometheus │
│ - Grafana │
│ - Alertmanager │
│ - Audit Log DB │
└─────────────────┘
```
### Implementation Steps
**Step 1: Data Validation Pipeline**
```python
# Python: data_validation.py
import pandas as pd
import numpy as np
import hashlib
from dataclasses import dataclass
from typing import Optional
from scipy.stats import ks_2samp
@dataclass
class ValidationResult:
passed: bool
checks_performed: int
checks_passed: int
failures: list[dict]
class DataValidator:
"""
Validates training data against schema and distribution expectations.
Integrated into CI/CD pipeline.
"""
def __init__(self, reference_stats: dict):
self.reference_stats = reference_stats
def validate(
self,
data: pd.DataFrame,
schema: dict,
drift_threshold: float = 0.1
) -> ValidationResult:
results = ValidationResult(
passed=True,
checks_performed=0,
checks_passed=0,
failures=[]
)
# Schema validation
results.checks_performed += 1
expected_columns = set(schema["columns"])
actual_columns = set(data.columns)
if expected_columns != actual_columns:
results.passed = False
results.failures.append({
"check": "schema",
"expected": list(expected_columns),
"actual": list(actual_columns),
"missing": list(expected_columns - actual_columns),
"extra": list(actual_columns - expected_columns)
})
else:
results.checks_passed += 1
# Distribution validation
for col, ref_stats in self.reference_stats.items():
if col not in data.columns:
continue
results.checks_performed += 1
if pd.api.types.is_numeric_dtype(data[col]):
statistic, p_value = ks_2samp(
data[col].dropna(),
ref_stats["samples"]
)
if statistic > drift_threshold:
results.passed = False
results.failures.append({
"check": "distribution_drift",
"column": col,
"ks_statistic": float(statistic),
"threshold": drift_threshold
})
else:
results.checks_passed += 1
# Null value checks
null_threshold = schema.get("null_threshold", 0.0)
for col in schema["columns"]:
if col not in data.columns:
continue
results.checks_performed += 1
null_rate = data[col].isna().mean()
if null_rate > null_threshold:
results.passed = False
results.failures.append({
"check": "null_rate",
"column": col,
"null_rate": float(null_rate),
"threshold": null_threshold
})
else:
results.checks_passed += 1
return results
```
**Step 2: Training Pipeline**
```python
# Python: training_pipeline.py
import json
from pathlib import Path
from datetime import datetime
import hashlib
class TrainingPipeline:
"""
Declarative training pipeline as code.
Defines all stages, configurations, and validation gates.
"""
def __init__(self, config: dict):
self.config = config
self.git_info = self._capture_git_info()
def _capture_git_info(self) -> dict:
import subprocess
try:
commit = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True
).stdout.strip()
branch = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
text=True
).stdout.strip()
return {"commit": commit, "branch": branch}
except:
return {"commit": "unknown", "branch": "unknown"}
def run(self) -> dict:
"""
Execute training pipeline stages.
Returns execution metadata for governance.
"""
execution_id = hashlib.sha256(
f"{datetime.now().isoformat()}{self.git_info}".encode()
).hexdigest()[:12]
results = {
"execution_id": execution_id,
"pipeline": "sentiment_training",
"git_info": self.git_info,
"stages": {},
"status": "running",
"start_time": datetime.now().isoformat()
}
try:
# Stage 1: Data extraction and validation
data = self._extract_data()
validator = self._validate_data(data)
if not validator.passed:
raise ValueError(f"Data validation failed: {validator.failures}")
results["stages"]["validation"] = "passed"
# Stage 2: Training
model = self._train(data)
results["stages"]["training"] = "completed"
# Stage 3: Evaluation
metrics = self._evaluate(model, data)
results["stages"]["evaluation"] = metrics
results["metrics"] = metrics
if not self._passes_thresholds(metrics):
raise ValueError(f"Evaluation thresholds not met: {metrics}")
# Stage 4: Registration
model_path = self._register_model(model, metrics)
results["stages"]["registration"] = model_path
results["model_path"] = model_path
results["status"] = "success"
results["end_time"] = datetime.now().isoformat()
except Exception as e:
results["status"] = "failed"
results["error"] = str(e)
results["end_time"] = datetime.now().isoformat()
return results
def _extract_data(self):
"""Placeholder for data extraction."""
pass
def _validate_data(self, data):
"""Placeholder for data validation."""
pass
def _train(self, data):
"""Placeholder for model training."""
pass
def _evaluate(self, model, data):
"""Placeholder for model evaluation."""
pass
def _register_model(self, model, metrics):
"""Placeholder for model registration."""
pass
def _passes_thresholds(self, metrics: dict) -> bool:
thresholds = self.config.get("evaluation_thresholds", {})
for metric, threshold in thresholds.items():
if metric.startswith(">"):
actual_metric = metric[1:]
if metrics.get(actual_metric, 0) <= threshold:
return False
elif metric.startswith("<"):
actual_metric = metric[1:]
if metrics.get(actual_metric, float('inf')) >= threshold:
return False
return True
```
**Step 3: Monitoring and Alerting**
```yaml
# YAML: Alert rules for production monitoring
# File: monitoring/alerts/ml-sentiment-alerts.yaml
groups:
- name: ml_serving_alerts
rules:
- alert: SentimentModelAccuracyDegraded
expr: |
(sum(inference_feedback{correct="true"} offset 1d)
/ sum(inference_feedback offset 1d))
< 0.82
for: 2h
labels:
severity: critical
team: ml-ops
model: sentiment-v2
annotations:
summary: "Sentiment model accuracy drops below production threshold"
description: "Current accuracy {{ $value | printf \"%.2f\" }} < 0.82"
- alert: SentimentDataDrift
expr: sentiment_input_drift_score{quantile="0.95"} > 0.15
for: 30m
labels:
severity: warning
annotations:
summary: "Significant input distribution drift for sentiment model"
- alert: SentimentLatencySLOBreach
expr: |
histogram_quantile(0.99, rate(ml_inference_duration_seconds_bucket{model="sentiment"}[5m]))
> 0.500
for: 5m
labels:
severity: critical
annotations:
summary: "P99 latency exceeds 500ms SLO"
```
**Step 4: Governance Integration**
```python
# Python: governance.py - Complete governance workflow
from datetime import datetime
class GovernanceWorkflow:
"""
Complete governance integration for ML pipeline.
Enforces approval gates and builds audit trails.
"""
def __init__(self, audit_log, model_registry):
self.audit_log = audit_log
self.model_registry = model_registry
def approve_and_deploy(
self,
model_id: str,
target_environment: str,
approvers: list[str],
fairness_results: dict
) -> dict:
"""
Execute governance approval workflow.
- Verify fairness criteria
- Collect required approvals
- Register approval in audit log
- Trigger deployment
"""
from uuid import uuid4
# Fairness gate
max_disparity = abs(fairness_results.get("max_positive_rate_diff", 0))
if max_disparity > 0.15:
return {
"status": "rejected",
"reason": f"Fairness threshold exceeded: disparity={max_disparity}"
}
# Approval gate
required_approvers = ["ml-ops-lead", "business-stakeholder"]
missing_approvers = set(required_approvers) - set(approvers)
if missing_approvers:
return {
"status": "pending_approval",
"missing_approvers": list(missing_approvers)
}
# All gates passed—record in audit and deploy
entry = AuditEntry(
timestamp=datetime.now().isoformat(),
entry_id=str(uuid4()),
event_type="model_promotion",
actor=", ".join(approvers),
resource_type="model",
resource_id=model_id,
action="promote_to_production",
details={
"target_environment": target_environment,
"approvers": approvers,
"fairness_passed": True,
"max_disparity": max_disparity
}
)
self.audit_log.append(entry)
return {
"status": "approved",
"approved_by": approvers,
"audit_entry_id": entry.entry_id
}
```
### Validation Checklist
Complete the following to validate your pipeline:
```
[ ] Pipeline executes data validation and fails on schema violations
[ ] Training produces model artifacts with versioned metadata
[ ] Evaluation metrics published to monitoring stack
[ ] Drift detection monitors and alerts on distribution shifts
[ ] Model promotion requires approval through governance workflow
[ ] Audit log records all model lifecycle events
[ ] Rollback procedure documented and tested
[ ] Monitoring dashboard shows live model metrics
[ ] Alerting triggers tested with synthetic failures
[ ] Fairness evaluation integrated into evaluation stage
```