RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /MLOps for Local AI
  6. /Ch. 24
MLOps for Local AI

24. MLOps Pipeline Project

Chapter 24 of 24 · 30 min
KEY INSIGHT

This capstone project integrates previous chapters into a production-ready MLOps pipeline. You'll implement drift detection, CI/CD, GitOps, monitoring, governance, and audit trails—a complete, operational MLOps system for local AI. ### Project Overview Build a complete MLOps pipeline for a sentiment analysis model serving local inference requests. The system will: 1. **Detect data and model drift** in real-time during serving 2. **Automate training and evaluation** through pipeline as code 3. **Manage deployments** using GitOps principles 4. **Monitor performance** with alerts and dashboards 5. **Govern responsibly** with fairness evaluation and audit trails 6. **Retain accountability** through detailed logging ### Architecture ``` ┌─────────────────────────────┐ │ Git Repository │ │ (Pipeline as Code) │ │ (Model Configurations) │ │ (Deployment Manifests) │ └──────────────┬──────────────┘ │ ┌──────────────▼──────────────┐ │ CI/CD Pipeline │ │ (GitHub Actions/GitLab CI) │ │ - Data Validation │ │ - Model Training │ │ - Evaluation │ │ - Artifact Registration │ └──────────────┬──────────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ │ │ ┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐ │ Production │ │ Staging │ │ Development │ │ (Edge Nodes) │ │ (Staging) │ │ (Experiment) │ └────────┬────────┘ └─────────────────┘ └─────────────────┘ │ ┌────────▼────────┐ │ Monitoring │ │ Stack │ │ - Prometheus │ │ - Grafana │ │ - Alertmanager │ │ - Audit Log DB │ └─────────────────┘ ``` ### Implementation Steps **Step 1: Data Validation Pipeline** ```python # Python: data_validation.py import pandas as pd import numpy as np import hashlib from dataclasses import dataclass from typing import Optional from scipy.stats import ks_2samp @dataclass class ValidationResult: passed: bool checks_performed: int checks_passed: int failures: list[dict] class DataValidator: """ Validates training data against schema and distribution expectations. Integrated into CI/CD pipeline. """ def __init__(self, reference_stats: dict): self.reference_stats = reference_stats def validate( self, data: pd.DataFrame, schema: dict, drift_threshold: float = 0.1 ) -> ValidationResult: results = ValidationResult( passed=True, checks_performed=0, checks_passed=0, failures=[] ) # Schema validation results.checks_performed += 1 expected_columns = set(schema["columns"]) actual_columns = set(data.columns) if expected_columns != actual_columns: results.passed = False results.failures.append({ "check": "schema", "expected": list(expected_columns), "actual": list(actual_columns), "missing": list(expected_columns - actual_columns), "extra": list(actual_columns - expected_columns) }) else: results.checks_passed += 1 # Distribution validation for col, ref_stats in self.reference_stats.items(): if col not in data.columns: continue results.checks_performed += 1 if pd.api.types.is_numeric_dtype(data[col]): statistic, p_value = ks_2samp( data[col].dropna(), ref_stats["samples"] ) if statistic > drift_threshold: results.passed = False results.failures.append({ "check": "distribution_drift", "column": col, "ks_statistic": float(statistic), "threshold": drift_threshold }) else: results.checks_passed += 1 # Null value checks null_threshold = schema.get("null_threshold", 0.0) for col in schema["columns"]: if col not in data.columns: continue results.checks_performed += 1 null_rate = data[col].isna().mean() if null_rate > null_threshold: results.passed = False results.failures.append({ "check": "null_rate", "column": col, "null_rate": float(null_rate), "threshold": null_threshold }) else: results.checks_passed += 1 return results ``` **Step 2: Training Pipeline** ```python # Python: training_pipeline.py import json from pathlib import Path from datetime import datetime import hashlib class TrainingPipeline: """ Declarative training pipeline as code. Defines all stages, configurations, and validation gates. """ def __init__(self, config: dict): self.config = config self.git_info = self._capture_git_info() def _capture_git_info(self) -> dict: import subprocess try: commit = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True ).stdout.strip() branch = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True ).stdout.strip() return {"commit": commit, "branch": branch} except: return {"commit": "unknown", "branch": "unknown"} def run(self) -> dict: """ Execute training pipeline stages. Returns execution metadata for governance. """ execution_id = hashlib.sha256( f"{datetime.now().isoformat()}{self.git_info}".encode() ).hexdigest()[:12] results = { "execution_id": execution_id, "pipeline": "sentiment_training", "git_info": self.git_info, "stages": {}, "status": "running", "start_time": datetime.now().isoformat() } try: # Stage 1: Data extraction and validation data = self._extract_data() validator = self._validate_data(data) if not validator.passed: raise ValueError(f"Data validation failed: {validator.failures}") results["stages"]["validation"] = "passed" # Stage 2: Training model = self._train(data) results["stages"]["training"] = "completed" # Stage 3: Evaluation metrics = self._evaluate(model, data) results["stages"]["evaluation"] = metrics results["metrics"] = metrics if not self._passes_thresholds(metrics): raise ValueError(f"Evaluation thresholds not met: {metrics}") # Stage 4: Registration model_path = self._register_model(model, metrics) results["stages"]["registration"] = model_path results["model_path"] = model_path results["status"] = "success" results["end_time"] = datetime.now().isoformat() except Exception as e: results["status"] = "failed" results["error"] = str(e) results["end_time"] = datetime.now().isoformat() return results def _extract_data(self): """Placeholder for data extraction.""" pass def _validate_data(self, data): """Placeholder for data validation.""" pass def _train(self, data): """Placeholder for model training.""" pass def _evaluate(self, model, data): """Placeholder for model evaluation.""" pass def _register_model(self, model, metrics): """Placeholder for model registration.""" pass def _passes_thresholds(self, metrics: dict) -> bool: thresholds = self.config.get("evaluation_thresholds", {}) for metric, threshold in thresholds.items(): if metric.startswith(">"): actual_metric = metric[1:] if metrics.get(actual_metric, 0) <= threshold: return False elif metric.startswith("<"): actual_metric = metric[1:] if metrics.get(actual_metric, float('inf')) >= threshold: return False return True ``` **Step 3: Monitoring and Alerting** ```yaml # YAML: Alert rules for production monitoring # File: monitoring/alerts/ml-sentiment-alerts.yaml groups: - name: ml_serving_alerts rules: - alert: SentimentModelAccuracyDegraded expr: | (sum(inference_feedback{correct="true"} offset 1d) / sum(inference_feedback offset 1d)) < 0.82 for: 2h labels: severity: critical team: ml-ops model: sentiment-v2 annotations: summary: "Sentiment model accuracy drops below production threshold" description: "Current accuracy {{ $value | printf \"%.2f\" }} < 0.82" - alert: SentimentDataDrift expr: sentiment_input_drift_score{quantile="0.95"} > 0.15 for: 30m labels: severity: warning annotations: summary: "Significant input distribution drift for sentiment model" - alert: SentimentLatencySLOBreach expr: | histogram_quantile(0.99, rate(ml_inference_duration_seconds_bucket{model="sentiment"}[5m])) > 0.500 for: 5m labels: severity: critical annotations: summary: "P99 latency exceeds 500ms SLO" ``` **Step 4: Governance Integration** ```python # Python: governance.py - Complete governance workflow from datetime import datetime class GovernanceWorkflow: """ Complete governance integration for ML pipeline. Enforces approval gates and builds audit trails. """ def __init__(self, audit_log, model_registry): self.audit_log = audit_log self.model_registry = model_registry def approve_and_deploy( self, model_id: str, target_environment: str, approvers: list[str], fairness_results: dict ) -> dict: """ Execute governance approval workflow. - Verify fairness criteria - Collect required approvals - Register approval in audit log - Trigger deployment """ from uuid import uuid4 # Fairness gate max_disparity = abs(fairness_results.get("max_positive_rate_diff", 0)) if max_disparity > 0.15: return { "status": "rejected", "reason": f"Fairness threshold exceeded: disparity={max_disparity}" } # Approval gate required_approvers = ["ml-ops-lead", "business-stakeholder"] missing_approvers = set(required_approvers) - set(approvers) if missing_approvers: return { "status": "pending_approval", "missing_approvers": list(missing_approvers) } # All gates passed—record in audit and deploy entry = AuditEntry( timestamp=datetime.now().isoformat(), entry_id=str(uuid4()), event_type="model_promotion", actor=", ".join(approvers), resource_type="model", resource_id=model_id, action="promote_to_production", details={ "target_environment": target_environment, "approvers": approvers, "fairness_passed": True, "max_disparity": max_disparity } ) self.audit_log.append(entry) return { "status": "approved", "approved_by": approvers, "audit_entry_id": entry.entry_id } ``` ### Validation Checklist Complete the following to validate your pipeline: ``` [ ] Pipeline executes data validation and fails on schema violations [ ] Training produces model artifacts with versioned metadata [ ] Evaluation metrics published to monitoring stack [ ] Drift detection monitors and alerts on distribution shifts [ ] Model promotion requires approval through governance workflow [ ] Audit log records all model lifecycle events [ ] Rollback procedure documented and tested [ ] Monitoring dashboard shows live model metrics [ ] Alerting triggers tested with synthetic failures [ ] Fairness evaluation integrated into evaluation stage ```

EXERCISE

Implement the complete MLOps pipeline as described. Begin with data validation, add training pipeline integration, build monitoring dashboards, and complete with governance workflow. Execute the full pipeline and verify each component functions correctly. Document your architecture and operational runbooks. This completes the operator track of Course A007: MLOps for Local AI. The material provides foundations for building production-ready ML systems that are observable, governed, and maintainable at scale.

← Chapter 23
Audit Trails
Course complete →
Browse all courses