MLOps for Local AI
Learn mlops for local ai through RunLocalAI's practical lens: mlops, mlflow, ci cd and drift, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.
- I004
Why this course matters
MLOps for Local AI is for operators making local AI reliable, measurable and cheaper to run. It connects mlops, mlflow, ci cd, drift and orchestration to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?
What you will be able to do
By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.
How to use this course
Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as MLOps Overview, Experiment Tracking, MLflow Setup and MLflow Tracking Server and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.
- 01MLOps OverviewMLOps for local AI faces unique constraints: no vendor-managed services, limited compute, and air-gapped environments. Your tooling must be lightweight, self-hosted, and operate without cloud dependencies. The three pillars of MLOps—reproducibility, automation, and monitoring—map differently in local contexts. Reproducibility means tracking every experiment with its exact data snapshot and hyperparameters. Automation means orchestrating training pipelines that trigger on schedule or data arrival. Monitoring means watching model predictions and input distributions for drift, often with dashboards that run on local infrastructure. Failure modes in local MLOps often stem from manual processes. If your "model deployment" is "someone SCPs a pickle file to a server," you've lost traceability. Every model should know: what data trained it, what code generated it, what validation it passed, and who approved it.10 min
- 02Experiment TrackingParameters and metrics are the visible layer. The invisible layer is the computational graph—your code, dependencies, and data. Capture enough context to reproduce a run without depending on institutional memory. Parameters flow in both directions. Hyperparameters (learning rate, batch size) are inputs you control. Learned parameters (weights) are outputs you measure. But the real value emerges when you track all parameters: data paths, feature flags, random seeds, hardware configurations. A single run with a bad seed can produce wildly different results. Artifacts are the outputs worth keeping: model binaries, processed datasets, visualizations, serialized preprocessors. MLflow and similar tools store artifacts in designated locations, making retrieval deterministic. ```python # Minimal experiment tracking with MLflow import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score mlflow.set_experiment("spam-classifier-v2") with mlflow.start_run(run_name="baseline-rf"): # Log parameters mlflow.log_param("n_estimators", 100) mlflow.log_param("max_depth", 10) mlflow.log_param("random_seed", 42) # Train model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42) model.fit(X_train, y_train) # Evaluate and log metrics preds = model.predict(X_test) accuracy = accuracy_score(y_test, preds) mlflow.log_metric("accuracy", accuracy) # Log model artifact mlflow.sklearn.log_model(model, "model") ``` This pattern—log params, train, evaluate, log metrics, save model—forms the foundation of every experiment tracking workflow.15 min
- 03MLflow SetupThe tracking URI determines where data goes. `mlflow.set_tracking_uri()` controls this. Without explicit configuration, MLflow writes to `./mlruns` locally. Explicit configuration enables sharing across processes and future migration. Configuration happens in code or environment variables: ```python import os # Option 1: Environment variable os.environ["MLFLOW_TRACKING_URI"] = "sqlite:///mlflow.db" # Option 2: Explicit in code import mlflow mlflow.set_tracking_uri("sqlite:///mlflow.db") ``` The SQLite backend stores experiments in a local file. This works for single machines but doesn't scale beyond one operator. For collaborative environments, a server-based approach (covered in the next chapter) is necessary. Backend store configuration options: | URI | Backend | Use Case | |-----|---------|----------| | `./mlruns` | Filesystem | Development, single user | | `sqlite:///mlflow.db` | SQLite | Single machine, light load | | `postgresql://host/db` | PostgreSQL | Multi-user, production | MLflow also captures the execution environment. It logs installed packages automatically, ensuring you can reproduce the software context later.20 min
- 04MLflow Tracking ServerThe server doesn't run training—it receives logging calls. Your training code still runs locally; it just sends results to the remote server. The client library handles batching and retry logic. Connect clients to the server: ```python import mlflow # Point to your server mlflow.set_tracking_uri("http://localhost:5000") # All logging now goes to the server mlflow.log_param("learning_rate", 0.01) ``` For remote servers (servers on other machines or containers), use the appropriate hostname or IP. For local containerized deployments, use `host.docker.internal` on Docker Desktop or the container's IP on Linux. Authentication isn't built into the basic MLflow server. For air-gapped local networks, this may be acceptable. For environments requiring auth, consider a reverse proxy (nginx with basic auth) or the MLflow Enterprise features. ```bash # Secure the server behind nginx with basic auth # nginx.conf snippet server { listen 443 ssl; server_name mlflow.local; auth_basic "MLflow Access"; auth_basic_user_file /etc/nginx/.htpasswd; location / { proxy_pass http://localhost:5000; } } ```20 min
- 05Model RegistryThe registry enforces lifecycle stages. `Staging` is for testing before production. `Production` is for active serving. `Archived` retains models for rollback or audit without cluttering active lists. Query and transition models: ```python # List all versions of a model versions = client.search_model_versions("name='spam-classifier'") for v in versions: print(f"Version {v.version}, Stage: {v.current_stage}, Run: {v.run_id}") # Transition to production client.transition_model_version_stage( name="spam-classifier", version=3, stage="Production" ) # Add description client.update_model_version( name="spam-classifier", version=3, description="Updated with balanced class weights, +2% accuracy" ) ``` Model metadata matters. Store what the model does, who approved it, and what validation it passed. This turns the registry into an audit trail. ```python # Annotate model with rich metadata client.set_model_version_tag( name="spam-classifier", version=3, key="validation_report", value="Passed accuracy > 0.92, latency < 50ms" ) client.set_model_version_tag( name="spam-classifier", version=3, key="approved_by", value="jane@ops-team" ) ```20 min
- 06Model VersioningEvery model artifact should be traceable to its inputs. Without this, you're deploying artifacts with unknown provenance—a compliance and operational risk. MLflow captures run metadata automatically, but you can enrich it with external version references: ```python import mlflow import git repo = git.Repo(search_parent_dirs=True) commit_hash = repo.head.commit.hexsha with mlflow.start_run(): mlflow.log_param("git_commit", commit_hash) mlflow.log_param("git_branch", repo.active_branch.name) # Also log data version if using DVC mlflow.log_param("data_version", "v2.1") # ... training code ... ``` Comparing versions requires consistent metrics: ```python def compare_versions(client, model_name, versions): """Compare metrics across model versions.""" results = [] for v in versions: run_id = client.get_model_version(model_name, v).run_id run = client.get_run(run_id) metrics = {k: v.value for k, v in run.data.metrics.items()} results.append({"version": v, "metrics": metrics}) return results comparison = compare_versions(client, "spam-classifier", [1, 2, 3]) ```20 min
- 07Pipeline OrchestrationOrchestration introduces operational complexity. Start with manual scripts; graduate to orchestration when the overhead of automation (configuration, monitoring, debugging) is less than the cost of manual errors. The core concepts: - **Task**: A single unit of work (run a script, call an API) - **DAG**: The graph defining task dependencies - **Trigger**: What initiates the pipeline (schedule, event, manual) - **Executor**: Where and how tasks run (local, Kubernetes, Ray) - **Operator**: The interface defining a task type ML-specific patterns: ```python # Pseudo-code for an ML pipeline pipeline: - task: fetch_data trigger: schedule "@daily" - task: validate_data depends: [fetch_data] condition: "data quality score > 0.95" - task: train_model depends: [validate_data] resources: gpu=true, memory=16Gi - task: evaluate_model depends: [train_model] condition: "accuracy > 0.90 AND latency < 100ms" - task: deploy_model depends: [evaluate_model] approval: required ``` Conditional execution is critical. You don't deploy models that fail validation. You don't retrain if data hasn't changed. Orchestration makes these conditions explicit and automated.15 min
- 08Airflow for AIAirflow's strength is its Python-native DAG definition. If you can write a Python function, you can create an Airflow task. The downside: Airflow is designed for task orchestration, not data processing. Heavy data transformations belong in Spark or Ray; Airflow orchestrates calls to those systems. Define a DAG for ML training: ```python # dags/ml_pipeline.py from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator default_args = { "owner": "ml-team", "depends_on_past": False, "start_date": datetime(2024, 1, 1), } with DAG( dag_id="daily-model-training", default_args=default_args, schedule_interval="0 2 * * *", # 2 AM daily catchup=False, ) as dag: fetch_data = BashOperator( task_id="fetch_training_data", bash_command="python scripts/fetch_data.py --date {{ ds }}" ) validate_data = PythonOperator( task_id="validate_dataset", python_callable=validate_dataset, op_kwargs={"date": "{{ ds }}"} ) train_model = BashOperator( task_id="train_model", bash_command="python scripts/train.py --date {{ ds }}", env={"MLFLOW_TRACKING_URI": "http://mlflow:5000"} ) evaluate_model = PythonOperator( task_id="evaluate_model", python_callable=evaluate_model ) deploy_on_success = BashOperator( task_id="deploy_model", bash_command="python scripts/deploy.py", trigger_rule="all_success" ) fetch_data >> validate_data >> train_model >> evaluate_model >> deploy_on_success ``` The `{{ ds }}` syntax is Airflow's templating—substituted at runtime with the execution date.15 min
- 09Prefect for AIPrefect's cloud offering provides hosted orchestration with automatic retries and observability, but the Orion engine runs entirely locally. You get the developer experience without vendor lock-in. Define the flow: ```python @flow(name="daily-ml-pipeline", log_prints=True) def ml_pipeline(date: str = None): if date is None: from datetime import date date = date.today().isoformat() path = fetch_data(date) validated = validate_data(path) run_id = train_model(date, validated) promote_model(run_id) # Run locally if __name__ == "__main__": ml_pipeline("2024-01-15") ``` Prefect also provides deployment to local infrastructure: ```bash prefect work-pool create local-pool prefect agent start --work-pool local-pool ```20 min
- 10Model Validation GatesThresholds should reflect business requirements, not just model performance. Accuracy > 0.90 is a model requirement. Latency < 100ms might be a user experience requirement. Separate these concerns but enforce both. Integrate gates into orchestration: ```python # In Prefect task @task def gate_check(model_name: str, version: int) -> bool: from validation_gate import validate_model, thresholds result = validate_model(model_name, version, thresholds) if not result.passed: raise ValueError(f"Validation failed: {result.message}") return True ``` In Airflow: ```python from airflow.operators.python import BranchPythonOperator check_gate = BranchPythonOperator( task_id="validate_model", python_callable=lambda: "deploy_model" if gate_passed() else "alert_failure", ) ```20 min
- 11Data ValidationSchema validation catches structural problems. Distribution validation catches semantic problems. A dataset can have perfect schema but shifted distributions that degrade model performance. For distribution validation, compare training data against baseline: ```python # Distribution drift detection def detect_drift(baseline_path: str, current_path: str, column: str, threshold: float = 0.05): import scipy.stats as stats baseline = pd.read_csv(baseline_path)[column] current = pd.read_csv(current_path)[column] # Chi-square test for categorical, KS test for continuous if baseline.dtype == "object": stat, p_value = stats.chisquare( observed=current.value_counts(), expected=baseline.value_counts().reindex(current.value_counts().index, fill_value=0) ) else: stat, p_value = stats.ks_2samp(baseline, current) drift_detected = p_value < threshold return { "drift_detected": drift_detected, "p_value": p_value, "statistic": stat } ```15 min
- 12Model ValidationSingle-metric validation is insufficient. A model can have high accuracy but poor recall for minority classes. Thorough validation examines precision, recall, F1, ROC-AUC, and confusion matrices. Your threshold should reflect business costs of false positives vs. false negatives. Fairness validation catches demographic bias: ```python # Fairness validation def validate_fairness(model, X_test, y_test, sensitive_feature, threshold=0.1): """Check for demographic parity in predictions.""" groups = X_test[sensitive_feature].unique() metrics_by_group = {} for group in groups: mask = X_test[sensitive_feature] == group y_pred = model.predict(X_test[mask]) metrics_by_group[group] = accuracy_score(y_test[mask], y_pred) max_diff = max(metrics_by_group.values()) - min(metrics_by_group.values()) return { "metrics_by_group": metrics_by_group, "max_difference": max_diff, "fair": max_diff <= threshold } ``` Regression model validation differs: ```python # Regression validation def validate_regression(model, X_test, y_test): from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score y_pred = model.predict(X_test) return { "rmse": np.sqrt(mean_squared_error(y_test, y_pred)), "mae": mean_absolute_error(y_test, y_pred), "r2": r2_score(y_test, y_pred), "mape": np.mean(np.abs((y_test - y_pred) / (y_test + 1e-10))) * 100 } ```20 min
- 13Drift DetectionDrift detection is the practice of identifying when your model's operating environment diverges from its training conditions. In local AI deployments, where models serve specific user populations over extended periods, drift is not theoretical—it's inevitable. ### Understanding Drift in Context Drift occurs when the statistical properties of your input data, output predictions, or the underlying problem itself change over time. Unlike cloud deployments where retraining pipelines can trigger automatically, local AI operators must build explicit observation into the serving stack. Drift compounds silently. A model that performs adequately in month one may degrade to dangerous territory by month six without any external indication. Users adapt their queries, your data distribution shifts, or the real-world phenomenon you're predicting fundamentally changes. ### Detection Approaches There are three primary drift detection approaches: **Statistical tests** compare feature distributions between a reference period and current observations. The Kolmogorov-Smirnov test measures maximum distance between cumulative distribution functions. The Chi-squared test evaluates categorical feature shifts. These are lightweight to compute and suitable for deployment on edge devices. **Distance-based methods** calculate divergence between probability distributions. KL divergence, Jensen-Shannon distance, and Wasserstein distance each offer different sensitivity profiles. Lower computational overhead than full statistical tests, but require choosing appropriate thresholds empirically. **Sequential methods** track performance metrics over time, treating drift detection as a change-point problem. Page-Hinkley test and CUSUM (cumulative sum) detect statistically significant shifts in monitored statistics. ### Implementation Considerations Local deployment constraints shape your drift detection architecture. You cannot stream infinite data to a central server for batch analysis. Instead, implement rolling window statistics computed on-device with lightweight reporting to a central dashboard. ```python # Python: Rolling window drift detection using Wasserstein distance import numpy as np from scipy.stats import wasserstein_distance class RollingDriftDetector: def __init__(self, window_size: int = 1000, threshold: float = 0.15): self.reference_window = [] self.current_window = [] self.window_size = window_size self.threshold = threshold self.drift_detected = False def add_sample(self, features: np.ndarray, prediction: float): """Add a sample from current serving traffic.""" # Compress features for storage efficiency sample = np.concatenate([features.flatten(), [prediction]]) self.current_window.append(sample) if len(self.current_window) > self.window_size: self.current_window.pop(0) def set_reference(self, reference_data: list): """Set reference distribution from training or last validation.""" self.reference_window = reference_data def check_drift(self) -> tuple[bool, float]: """Check if drift exceeds threshold. Returns (drifted, distance).""" if len(self.current_window) < 100: return False, 0.0 # Insufficient data current_mean = np.mean(self.current_window, axis=0) reference_mean = np.mean(self.reference_window, axis=0) distance = wasserstein_distance(current_mean, reference_mean) self.drift_detected = distance > self.threshold return self.drift_detected, distance ``` ### Practical Limitations Drift detection without ground truth labels is inherently limited. You detect distribution changes, not performance degradation. A drifted model might still perform acceptably, or a stable-looking distribution might mask catastrophic performance collapse. Pair statistical drift detection with user feedback mechanisms where possible.15 min
- 14Data DriftData drift occurs when the statistical distribution of your input features changes over time. Detecting data drift is the foundation of proactive model retraining—catching distribution shifts before they cascade into prediction quality degradation. ### The Mechanics of Feature Drift Every model makes an implicit assumption: the future will resemble the past. This assumption lives in the training data distribution. When users generate data that diverges from this distribution, predictions suffer. Consider a local AI system screening support tickets. Over months, your product evolves. New features attract different user segments. Query language shifts as cultural references change. The ticket categories distribution you trained on no longer matches reality. Your model increasingly sees out-of-distribution inputs it cannot reliably process. ### Detection Implementation Memory-efficient feature drift detection for edge deployment requires careful resource management: ```python # Python: Efficient feature drift detection for edge deployment from collections import deque import numpy as np from scipy.stats import ks_2samp class FeatureDriftDetector: """ Monitors individual feature distributions for statistically significant drift. Designed for resource-constrained edge deployment. """ def __init__(self, n_features: int, window_size: int = 500, alpha: float = 0.05): self.n_features = n_features self.window_size = window_size self.alpha = alpha # Significance level # Rolling buffers per feature (memory-efficient deque) self.buffers = [deque(maxlen=window_size) for _ in range(n_features)] self.reference_means = None self.reference_stds = None self.drift_counts = 0 def capture_baseline(self, baseline_data: np.ndarray): """Capture statistical baseline from training or known-good data.""" self.reference_means = np.mean(baseline_data, axis=0) self.reference_stds = np.std(baseline_data, axis=0) # Pre-populate buffers with baseline for warm start for i in range(min(len(baseline_data), self.window_size)): for feat_idx in range(self.n_features): self.buffers[feat_idx].append(baseline_data[i, feat_idx]) def ingest(self, features: np.ndarray): """Ingest a single observation's features.""" if features.shape[0] != self.n_features: raise ValueError(f"Expected {self.n_features} features, got {features.shape[0]}") for feat_idx, value in enumerate(features): self.buffers[feat_idx].append(value) def assess(self) -> dict: """ Assess drift across all features using KS test. Returns dict with drift status per feature and overall status. """ if self.reference_means is None: return {"drifted": False, "error": "No baseline established"} results = {"drifted": False, "features": {}} for feat_idx in range(self.n_features): feature_data = np.array(self.buffers[feat_idx]) # Compute current statistics current_mean = np.mean(feature_data) current_std = np.std(feature_data) # Normalize for KS test to handle scale differences normalized = (feature_data - current_mean) / (current_std + 1e-8) reference = (self.reference_means[feat_idx], self.reference_stds[feat_idx]) ref_normalized = (0, 1) # Standard normal for comparison # KS test against normal distribution # In practice, compare against stored reference samples # Simplified: compare Z-score locations drift_score = abs(current_mean - reference[0]) / (reference[1] + 1e-8) results["features"][feat_idx] = { "drifted": drift_score > 3.0, # 3-sigma rule "score": float(drift_score) } if drift_score > 3.0: results["drifted"] = True return results ``` ### Categorical Feature Drift Numerical features yield to statistical tests. Categorical features require different treatment. Monitor category frequency distributions, alert on emerging categories with zero training frequency, and track category elimination events. ```python # Python: Categorical distribution drift detection from collections import Counter import numpy as np def categorical_drift_score( current: Counter, reference: Counter, total_current: int, total_reference: int ) -> dict: """ Compute drift metrics for categorical features. Uses Total Variation Distance as primary metric. """ # Get union of all categories all_categories = set(current.keys()) | set(reference.keys()) # Compute probability distributions current_probs = {cat: current.get(cat, 0) / total_current for cat in all_categories} ref_probs = {cat: reference.get(cat, 0) / total_reference for cat in all_categories} # Total Variation Distance tvd = 0.5 * sum(abs(current_probs[cat] - ref_probs[cat]) for cat in all_categories) # Flag novel categories (in current but not in reference) novel = set(current.keys()) - set(reference.keys()) # Flag atrophied categories (in reference but not current) atrophied = set(reference.keys()) - set(current.keys()) return { "tvd": tvd, "max_tvd": 1.0, # Normalized scale "novel_categories": list(novel), "atrophied_categories": list(atrophied), "drifted": tvd > 0.1 or len(novel) > 0 or len(atrophied) > 0 } ``` ### Operational Implications Data drift detection without automated response is observation without action. Build alerting thresholds that trigger retraining workflows in your MLOps pipeline. Distinguish minor fluctuations (expected noise) from systematic shifts (requiring intervention).20 min
- 15Model DriftModel drift—the decay of predictive performance over time—is distinct from data drift. Even with stable input distributions, model degradation occurs due to concept drift, adversarial adaptation, and stochastic degradation. Monitoring actual prediction quality is essential, not optional. ### Concept Drift Versus Data Drift Data drift describes changes in input distributions. Concept drift describes changes in the relationship between inputs and outputs. Your model learns: `P(Y|X)`. Concept drift occurs when the true underlying `P(Y|X)` evolves even if `P(X)` remains stable. A spam classifier trained in 2024 learns patterns that worked against spammer techniques of 2023. By 2025, spammers adapt. Your model sees the same distribution of email features but must map them to different spam/no-spam labels. Concept drift is occurring despite stable input distributions. ### Measuring Model Performance Drift Without ground truth labels, measuring model drift requires proxy signals: ```python # Python: Proxy-based model drift detection from dataclasses import dataclass from collections import deque import numpy as np @dataclass class PredictionRecord: """Compact record for prediction logging.""" timestamp: int prediction: float confidence: float # Model's confidence score input_hash: str # For deduplication class ModelDriftMonitor: """ Monitors model behavior over time using proxy signals. Requires: confidence scores, prediction distributions, and user feedback. """ def __init__(self, window_size: int = 1000): self.window_size = window_size self.predictions = deque(maxlen=window_size) self.confidences = deque(maxlen=window_once) # In practice: you'd persist these windows to disk def record(self, prediction: float, confidence: float): """Record a prediction and its confidence score.""" self.predictions.append(prediction) self.confidences.append(confidence) def compute_drift_metrics(self, reference_confidence: float) -> dict: """ Compute drift indicators from prediction behavior. Comparison against known-good reference confidence. """ if len(self.confidences) < 100: return {"status": "insufficient_data"} recent_confidence = np.mean(self.confidences) confidence_shift = reference_confidence - recent_confidence # Prediction distribution entropy recent_mean = np.mean(self.predictions) recent_std = np.std(self.predictions) return { "confidence_degradation": confidence_shift, "confidence_degraded": confidence_shift < -0.1, "prediction_mean": recent_mean, "prediction_std": recent_std, "data_window": len(self.confidences) } # Error: fix the typo - should be window_size, not window_once ModelDriftMonitor = type('ModelDriftMonitor', (), { 'window_size': 1000, 'predictions': deque(maxlen=1000), 'confidences': deque(maxlen=1000) }) def record(self, prediction: float, confidence: float): self.predictions.append(prediction) self.confidences.append(confidence) # Continuation of metrics computation def compute_drift_metrics(self, reference_confidence: float) -> dict: if len(self.confidences) < 100: return {"status": "insufficient_data"} recent_confidence = np.mean(self.confidences) confidence_shift = reference_confidence - recent_confidence recent_mean = np.mean(self.predictions) recent_std = np.std(self.predictions) return { "confidence_degradation": confidence_shift, "confidence_degraded": confidence_shift < -0.1, "prediction_mean": recent_mean, "prediction_std": recent_std } ``` ### Calibration Drift Model calibration—how well confidence scores match actual accuracy—degrades over time even when raw accuracy remains stable. A well-calibrated model is one where 90% confidence corresponds to 90% actual accuracy. Over time, distribution shifts cause calibration to drift. Track calibration curves periodically. If your model's 0.9 confidence bucket corresponds to only 0.7 actual accuracy, user-facing confidence scores require post-hoc calibration adjustment. ### Ground Truth-Based Monitoring Where ground truth labels become available (user corrections, outcome events, delayed feedback), build explicit performance monitoring: ```python # Python: Delayed ground truth monitoring from dataclasses import dataclass from collections import deque from typing import Optional import numpy as np @dataclass class LabeledObservation: prediction: float ground_truth: float timestamp: int lag_days: int # days between prediction and label availability class GroundTruthMonitor: """ Monitor model performance where ground truth eventually becomes available. Example: recommendation model sees clicks days later; loan model sees defaults months later. """ def __init__(self, evaluation_window_min: int = 500): self.pending = deque() # (prediction, feature_vector, timestamp) self.ground_truth = deque(maxlen=10000) # Matched labeled observations self.evaluation_window_min = evaluation_window_min def store_pending(self, prediction: float, features: np.ndarray, timestamp: int): """Store prediction awaiting eventual ground truth.""" self.pending.append((prediction, features, timestamp)) def receive_ground_truth(self, features: np.ndarray, ground_truth: float): """Match ground truth to stored predictions and store for evaluation.""" # In production: implement proper matching (e.g., by feature similarity or ID) if self.pending: pending_item = self.pending.popleft() self.ground_truth.append(LabeledObservation( prediction=pending_item[0], ground_truth=ground_truth, timestamp=pending_item[2], lag_days=0 # Calculate from parsed timestamp )) def compute_metrics(self) -> dict: """Compute current performance metrics from labeled observations.""" if len(self.ground_truth) < self.evaluation_window_min: return {"status": "insufficient_labels", "n": len(self.ground_truth)} recent = list(self.ground_truth)[-self.evaluation_window_min:] predictions = np.array([o.prediction for o in recent]) truths = np.array([o.ground_truth for o in recent]) # Mean Absolute Error for regression mae = np.mean(np.abs(predictions - truths)) # For classification: accuracy, AUC, etc. # Simplified example: binary classification accuracy at 0.5 threshold binary_preds = (predictions > 0.5).astype(int) binary_truths = (truths > 0.5).astype(int) accuracy = np.mean(binary_preds == binary_truths) return { "mae": float(mae), "accuracy": float(accuracy), "n_evaluations": len(recent), "drifted": accuracy < 0.85 # Trigger threshold } ``` ### Response Strategies Model drift without response is expensive observation. Determine your drift response protocol: automatic retraining, rollback to simpler fallback model, or human review threshold escalation. The protocol depends on your operational context and acceptable downtime.20 min
- 16CI/CD for MLContinuous Integration and Continuous Deployment principles apply to machine learning, but ML CI/CD differs fundamentally from software CI/CD. Model artifacts cannot be unit tested in isolation—they require evaluation against data, making your test suite dependent on distribution assumptions that may shift. ### The ML CI/CD Challenge Traditional software CI/CD tests code behavior: function X with input Y produces output Z. The same inputs always produce the same outputs. ML models don't guarantee this. The same input can produce different outputs across model versions, training runs, or random seeds. ML CI/CD must also evaluate whether a new model variant improves or degrades performance on relevant data. This requires maintaining evaluation datasets, defining acceptable performance bounds, and implementing comparison frameworks. ### Pipeline Architecture ```yaml # YAML: ML CI/CD pipeline structure (example Airflow DAG concept) # In practice: Airflow, Prefect, Metaflow, or similar orchestration name: ml_model_training_pipeline schedule: "0 2 * * *" # Daily at 2 AM stages: # Stage 1: Data validation - name: validate_data tasks: - check_schema: dataset: training_data expected_columns: [feature_1, feature_2, feature_3] - check_distribution: dataset: training_data reference: baseline_distribution.json threshold: 0.1 # Max KL divergence # Stage 2: Training - name: train_model tasks: - train: model_type: gradient_boosted hyperparameters: config/hyperparameters.yaml data: validated_training_data - register: artifact: trained_model_v{version} metrics: training_metrics.json # Stage 3: Evaluation - name: evaluate_model tasks: - unit_tests: runs: 50 # Fast smoke tests performance_threshold: 0.85 - integration_tests: runs: full_validation_set performance_threshold: 0.82 compared_to: production_model - shadow_tests: traffic_percentage: 5 duration_hours: 48 # Stage 4: Deployment - name: deploy_model condition: all_previous_stages_passed tasks: - promote: source: staging target: production - rollback_plan: previous_version: retained auto_rollback_threshold: degradation > 0.05 ``` ### Testing Levels for ML Systems **Unit tests** verify model components function correctly in isolation: preprocessing transformations, feature engineering logic, loss function implementations. These tests run quickly and catch obvious bugs. **Model tests** verify the trained model behaves correctly: output shapes are expected, confidence scores are in valid ranges, batching produces consistent results. These tests are relatively fast (seconds to minutes). **Performance tests** evaluate model quality: accuracy, latency, throughput, memory usage. Slow tests run against full validation sets and must pass before deployment. **Shadow tests** deploy new models alongside production models, routing isolated traffic to the new model without affecting users. Compare predictions and performance without live risk. ### Integration with Local Deployment Local AI deployments complicate CI/CD because you may not have identical environments across development and production. A model trained on your build server must perform identically across your deployed edge devices. ```python # Python: Environment consistency validation import subprocess import hashlib import json from pathlib import Path class EnvironmentValidator: """ Validates that training and serving environments are consistent. Critical for local AI deployment where hardware varies. """ def __init__(self, expected_dependencies: list[str]): self.expected_dependencies = expected_dependencies def capture_environment_hash(self) -> dict: """Capture environment characteristics for comparison.""" # Core library versions deps = {} for dep in self.expected_dependencies: try: result = subprocess.run( ["pip", "show", dep], capture_output=True, text=True ) version = "unknown" for line in result.stdout.split("\n"): if line.startswith("Version:"): version = line.split(":", 1)[1].strip() break deps[dep] = version except: deps[dep] = "not_found" # Python version py_version = subprocess.run( ["python", "--version"], capture_output=True, text=True ).stdout.strip() # Hardware info (for non-trivial hardware dependencies) cpu_info = self._get_cpu_info() cuda_available = self._check_cuda() return { "dependencies": deps, "python_version": py_version, "cpu_info": cpu_info, "cuda_available": cuda_available, "timestamp": int(subprocess.time.time()) } def validate_consistency( self, training_env: dict, serving_env: dict ) -> tuple[bool, list[str]]: """Compare environments, flagging inconsistencies.""" issues = [] # Check dependency alignment for dep, expected_version in training_env["dependencies"].items(): serving_version = serving_env["dependencies"].get(dep, "not_found") if serving_version != expected_version: issues.append( f"Dependency mismatch: {dep} " f"expected {expected_version}, got {serving_version}" ) # Critical checks based on your deployment requirements if training_env.get("cuda_available") != serving_env.get("cuda_available"): issues.append("CUDA availability mismatch between environments") return len(issues) == 0, issues def _get_cpu_info(self) -> str: try: with open("/proc/cpuinfo") as f: return f.readline().strip() except: return "unknown" def _check_cuda(self) -> bool: try: import torch return torch.cuda.is_available() except: return False ``` ### CI/CD Anti-Patterns Avoid testing models only against training data—performance on training data is meaningless. Test against held-out validation data and ideally against production traffic patterns. Avoid deploying without rollback capability. Auto-rollback on degradation is not optional for production ML systems. You will occasionally ship models that perform worse than their predecessors. Avoid manual promotion gates. If humans must manually approve deployments, approval becomes rubber-stamp behavior. Automate quality gates and route exceptions to human review, starting with automated notification.20 min
- 17Pipeline as CodeDefining ML pipelines as source code ensures reproducibility, version control, and auditability. Pipeline as code treats data processing, feature engineering, training, and evaluation as declarative specifications rather than manual procedures. ### Why Pipeline as Code Matters Manual pipeline execution creates invisible dependencies on operator knowledge. When the person who knows how to run training leaves, institutional knowledge leaves with them. Worse, manual execution is not reproducible—running the same steps with the same inputs does not guarantee identical outputs without explicit version control. Pipeline as code encodes the complete workflow: data sources, preprocessing steps, model configuration, evaluation criteria, and deployment targets. Anyone with code access can inspect, reproduce, or modify the workflow. ### Declarative Pipeline Definition ```python # Python: Declarative pipeline definition # Using a conceptual framework (implementation depends on tooling) @dataclass class PipelineStage: name: str component: str # Reference to component module config: dict dependencies: list[str] # Stage names this depends on @dataclass class Pipeline: name: str version: str stages: list[PipelineStage] metadata: dict # Owner, description, tags def render(self) -> dict: """Render pipeline definition to execution format.""" return { "pipeline": self.name, "version": self.version, "stages": [ { "name": s.name, "component": s.component, "config": s.config, "upstream": s.dependencies } for s in self.stages ] } # Example: Declarative training pipeline training_pipeline = Pipeline( name="recommender_training", version="v3.2.1", stages=[ PipelineStage( name="extract", component="data.extract.database", config={ "query": "SELECT * FROM user_events WHERE date > :cutoff", "connection": "analytics_db", "batch_size": 10000 }, dependencies=[] ), PipelineStage( name="transform_features", component="features.user_engagement", config={ "window_days": 30, "aggregation": ["mean", "std", "count"] }, dependencies=["extract"] ), PipelineStage( name="train", component="model.matrix_factorization", config={ "factors": 64, "regularization": 0.01, "epochs": 50, "early_stopping_patience": 5 }, dependencies=["transform_features"] ), PipelineStage( name="evaluate", component="evaluation.recommender", config={ "metrics": ["recall@10", "ndcg@10", "coverage"], "test_split": 0.2, "thresholds": { "recall@10": 0.15, "ndcg@10": 0.12 } }, dependencies=["train"] ), PipelineStage( name="register", component="registry.register_model", config={ "registry": "s3://models/acme/", "promote_if": "all_metrics_passed" }, dependencies=["evaluate"] ) ], metadata={ "owner": "recommendations-team", "description": "Collaborative filtering for content recommendations", "schedule": "0 3 * * *", "notify_on_failure": ["[email protected]"] } ) # Render to execution format (e.g., Airflow, Kubeflow, etc.) pipeline_def = training_pipeline.render() print(pipeline_def) ``` ### Config-Driven Variation Effective pipeline as code supports configuration-driven execution without code modification. Training hyperparameters, data sources, evaluation thresholds—these should be parameters, not hardcoded values. ```python # Python: Configuration-driven pipeline execution from dataclasses import dataclass, field from typing import Any import yaml @dataclass class PipelineConfig: """Configuration for pipeline execution.""" name: str environment: str # dev, staging, production data_source: str model_config: dict evaluation_config: dict deployment_config: dict @classmethod def from_yaml(cls, path: str) -> "PipelineConfig": """Load configuration from YAML file.""" with open(path) as f: config_dict = yaml.safe_load(f) return cls(**config_dict) @classmethod def for_environment(cls, base_config: str, env: str) -> "PipelineConfig": """Load base config, override with environment-specific settings.""" base = cls.from_yaml(base_config) env_overrides = cls.from_yaml(f"{base_config.replace('.yaml', '')}.{env}.yaml") # Merge overrides (env values take precedence) return cls( name=base.name, environment=env, data_source=env_overrides.data_source or base.data_source, model_config={**base.model_config, **env_overrides.model_config}, evaluation_config={**base.evaluation_config, **env_overrides.evaluation_config}, deployment_config={**base.deployment_config, **env_overrides.deployment_config} ) # Example YAML configuration # config/base.yaml # name: sentiment_training # data_source: s3://data/sentiment/normalized # model_config: # architecture: transformer # max_length: 512 # learning_rate: 0.0001 # evaluation_config: # test_size: 0.2 # metrics: [accuracy, f1, latency_p95] # deployment_config: # target: local_edge # replicas: 1 ``` ### Versioning Considerations Every pipeline execution should be traceable to its source code version and configuration version. Store pipeline definitions in version control alongside model artifacts. Tag releases with semantic versioning that indicates breaking changes versus incremental improvements. ```python # Python: Pipeline execution metadata tracking @dataclass class ExecutionRecord: """Record of pipeline execution for auditability.""" pipeline_version: str config_version: str execution_id: str # Unique per execution timestamp: str git_commit: str triggered_by: str # schedule, manual, api stages_completed: list[str] stages_failed: list[str] outputs: dict # Artifact locations, metrics def record_execution(pipeline: Pipeline, execution_id: str) -> ExecutionRecord: """Create execution record for tracking.""" import subprocess git_commit = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True ).stdout.strip() return ExecutionRecord( pipeline_version=pipeline.version, config_version="N/A", # Populate from loaded config execution_id=execution_id, timestamp=subprocess.run( ["date", "-Iseconds"], capture_output=True, text=True ).stdout.strip(), git_commit=git_commit, triggered_by="unknown", # Populate from execution context stages_completed=[], stages_failed=[], outputs={} ) ```25 min
- 18GitOps for ModelsGitOps extends version control practices to operational resources, including deployed models. Model artifacts become the source of truth, with deployment state automated from committed changes. Every model promotion or rollback traces to a git commit. ### GitOps Principles for ML GitOps for models means three things: First, the model artifact is declared in code (which model version serves which environment). Second, the declared state is the actual deployed state—no manual overrides. Third, all changes flow through git (track who changed what and when). In practice, this means your model registry is not a standalone system—it's an extension of your git workflow. Promotions are commits. Rollbacks are commits. The git history is your audit log. ### Implementation Pattern ```yaml # YAML: GitOps model deployment manifest # File: model-deployments.yaml (stored in git) apiVersion: mlops/v1 kind: ModelDeployment metadata: name: customer-segmentation namespace: production spec: model: registry: s3://ml-models/company/ name: customer-segmentation version: v2.4.1 # Reference to registered model version environment: production serving: replicas: 3 resources: cpu: "2" memory: 4Gi inference_endpoint: /v1/predict/segment drift_config: monitor_interval_hours: 6 alert_threshold: 0.12 auto_rollback_threshold: 0.18 reference_window_size: 1000 promotion_criteria: shadow_test_passed: true evaluation_metrics: silhouette_score: ">0.45" inference_latency_p99: "<100ms" approval_required: true approvers: - ml-opsリード - ビジネスユニットLG # A/B routing configuration apiVersion: mlops/v1 kind: ModelRouting metadata: name: customer-segmentation-ab namespace: production spec: routes: - weight: 80 model_version: v2.4.1 name: primary - weight: 20 model_version: v2.5.0-rc name: candidate traffic_splitting: method: header # Route by X-Model-Experiment header header_name: X-Deployment-Context ``` ```python # Python: GitOps model promotion workflow import subprocess from dataclasses import dataclass from pathlib import Path @dataclass class ModelPromotion: """Represents a model promotion through environments.""" model_name: str from_version: str to_version: str from_environment: str to_environment: str commit_sha: str approvers: list[str] def execute(self) -> dict: """Execute promotion as git commit.""" # Update deployment manifest manifest_path = Path(f"deployments/{self.model_name}.yaml") # Read, modify, write content = manifest_path.read_text() content = content.replace( f"version: {self.from_version}", f"version: {self.to_version}" ) manifest_path.write_text(content) # Stage and commit subprocess.run(["git", "add", str(manifest_path)], check=True) commit_message = f""" Promotion: {self.model_name} {self.from_version} -> {self.to_version} From: {self.from_environment} To: {self.to_environment} Approved by: {', '.join(self.approvers)} Promotion ID: {self.commit_sha} """.strip() result = subprocess.run( ["git", "commit", "-m", commit_message], capture_output=True, text=True ) # In GitOps: this commit triggers deployment reconciliation # ArgoCD, Flux, or equivalent watches git and reconciles cluster state return { "commit": result.stdout, "manifest_updated": True, "reconciliation_triggered": True } def rollback(self) -> dict: """Rollback to previous version as git revert.""" result = subprocess.run( ["git", "revert", "HEAD"], capture_output=True, text=True ) return {"reverted": True, "commit": result.stdout} # Promotion workflow with approval gates def promote_model_with_approval(promotion: ModelPromotion) -> bool: """ Execute promotion workflow with required approval gates. """ # 1. Verify evaluation passed evaluation_passed = verify_evaluation_criteria( promotion.model_name, promotion.to_version ) if not evaluation_passed: raise ValueError("Evaluation criteria not met") # 2. Create promotion proposal proposal = create_promotion_proposal(promotion) # 3. Collect required approvals required_approvers = promotion.approvers collected = collect_approvals(proposal, required_approvers) # 4. Execute promotion (commit to git) if collected.all_received: result = promotion.execute() return result["commit"] else: raise PermissionError("Required approvals not collected") ``` ### Repository Structure Effective GitOps for ML requires deliberate repository structure: ```bash # bash: GitOps model repository structure mkdir -p model-repo/{models,deployments,configs,workflows,evaluate} # models/: Versioned model artifacts (large files via git-lfs) # ├── customer-segmentation/ # │ ├── v2.4.1/ # │ │ ├── model artifacts # │ │ └── metadata.json # └── text-classifier/ # └── v1.0.0/ # deployments/: Environment-specific declarations # ├── production/ # │ ├── customer-segmentation.yaml # │ └── text-classifier.yaml # └── staging/ # └── customer-segmentation.yaml # configs/: Hyperparameter and pipeline configurations # ├── hyperparameters/ # │ ├── customer-segmentation/base.yaml # │ └── customer-segmentification/production.yaml # └── evaluation/ # └── metrics.yaml # workflows/: Pipeline definitions #workflows/ # ├── training.yaml # └── evaluation.yaml # evaluate/: Evaluation datasets and test cases # evaluate/ # ├── datasets/ # └── test_cases/ ``` ### Automated Reconciliation GitOps requires an reconciliation operator that continuously compares git-declared state with actual cluster state. When divergence is detected, the operator corrects production to match git. For local AI deployments, this might mean reconciliation between your git repository and model serving endpoints on edge devices. Implement your own lightweight reconciliation loop or adapt existing GitOps tools for ML-specific resources. ---25 min
- 19Infrastructure as CodeInfrastructure as Code (IaC) treats compute resources, networking, and serving configuration as versioned, declarative specifications. For local AI, IaC ensures consistent deployment across edge devices and reproducible environment reconstruction after failures. ### IaC for ML Serving When you deploy models locally, "infrastructure" includes your serving servers, model artifacts, runtime configuration, monitoring setup, and network access controls. IaC codifies these into version-controlled definitions. Without IaC, each deployed instance represents accumulated manual configuration drift. When you need to replicate an environment (new edge node, disaster recovery, scaling), undocumented steps cause failures. ### Terraform for ML Infrastructure ```hcl # HCL: Terraform configuration for local AI serving infrastructure # File: main.tf terraform { required_version = ">= 1.0" # Backend for state storage (local for simple deployments) backend "local" { path = "terraform.tfstate" } } # Variables for environment-specific configuration variable "deployment_name" { description = "Name for this deployment" type = string default = "local-ai-production" } variable "model_versions" { description = "Map of model names to their current versions" type = map(string) default = { sentiment = "v2.1.3" nlp_parser = "v1.5.0" recommender = "v3.2.1" } } variable "compute_resources" { description = "Compute allocation per model" type = map(object({ cpu_cores = number memory_gb = number gpu_enabled = bool })) default = { sentiment = { cpu_cores = 2, memory_gb = 4, gpu_enabled = false } nlp_parser = { cpu_cores = 4, memory_gb = 8, gpu_enabled = true } recommender = { cpu_cores = 2, memory_gb = 4, gpu_enabled = false } } } # Model registry configuration locals { model_registry_path = "s3://internal-models/mlops/" monitoring_endpoint = "http://monitoring.internal:9090" } # S3 bucket for model artifacts resource "aws_s3_bucket" "model_artifacts" { bucket = "${var.deployment_name}-artifacts" versioning { enabled = true } lifecycle { rule { enabled = true noncurrent_version_transition { days = 30 storage_class = "GLACIER" } } } tags = { Environment = var.deployment_name Component = "model-storage" ManagedBy = "terraform" } } # Compute instance for model serving (example for local VM/virtualized) resource "aws_instance" "ml_serving" { for_each = var.model_versions ami = "ami-model-serving-v2" # Pre-built serving image instance_type = lookup(var.compute_resources[each.key], "gpu_enabled") ? "g4dn.xlarge" : "c5.xlarge" tags = { Name = "${var.deployment_name}-${each.key}" Model = each.key ModelVersion = each.value Environment = var.deployment_name ManagedBy = "terraform" } lifecycle { create_before_destroy = true } } # Security group for model serving resource "aws_security_group" "ml_serving" { name = "${var.deployment_name}-ml-serving" description = "Security group for ML model serving instances" ingress { from_port = 8080 # Inference endpoint to_port = 8080 protocol = "tcp" cidr_blocks = ["10.0.0.0/8"] # Internal network only } ingress { from_port = 9090 # Metrics endpoint to_port = 9090 protocol = "tcp" cidr_blocks = ["10.0.1.0/24"] # Monitoring network only } egress { from_port = 443 # HTTPS outbound for model downloads to_port = 443 protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] } tags = { ManagedBy = "terraform" } } # Outputs for deployment information output "model_endpoints" { description = "Inference endpoints for deployed models" value = { for model, instance in aws_instance.ml_serving : model => "http://${instance.public_ip}:8080" } } output "deployment_status" { description = "Current deployment state summary" value = { deployment_name = var.deployment_name models = var.model_versions instances = length(aws_instance.ml_serving) } } ``` ### Ansible for Configuration Management Terraform manages infrastructure creation. Ansible manages configuration within existing state. Use Ansible for installing dependencies, configuring runtime environment, and deploying model artifacts. ```yaml # YAML: Ansible playbook for ML model deployment # File: deploy-models.yml - name: Deploy ML Models to Serving Infrastructure hosts: ml_serving become: yes vars: model_registry: "s3://internal-models/mlops/" serving_config_dir: /etc/ml-serving model_versions: sentiment: "v2.1.3" nlp_parser: "v1.5.0" recommender: "v3.2.1" tasks: - name: Ensure serving directories exist file: path: """{{ serving_config_dir }}/{{ item }}"""" state: directory owner: ml-serving group: ml-serving mode: '0755' loop: - models - logs - config - name: Create Python virtual environment for each model python_venv: name: """{{ serving_config_dir }}/venv/{{ item }}""" system_site_packages: no loop: - "{{ model_versions.keys() | list }}" - name: Install model serving dependencies pip: venv: """{{ serving_config_dir }}/venv/{{ item.service }}""" name: "{{ item.packages }}" loop: - { service: sentiment, packages: ['torch', 'fastapi', 'uvicorn'] } - { service: nlp_parser, packages: ['transformers', 'fastapi', 'uvicorn'] } - { service: recommender, packages: ['implicit', 'fastapi', 'uvicorn'] } - name: Download model artifacts from registry aws_s3: bucket: internal-models object: """mlops/{{ item.model }}/{{ item.version }}.tar.gz""" dest: """{{ serving_config_dir }}/models/{{ item.model }}.tar.gz""" mode: get loop: - { model: sentiment, version: "v2.1.3" } - { model: nlp_parser, version: "v1.5.0" } - { model: recommender, version: "v3.2.1" } - name: Extract model artifacts unarchive: src: """{{ serving_config_dir }}/models/{{ item.model }}.tar.gz""" dest: """{{ serving_config_dir }}/models/{{ item.model }}/""" remote_src: yes loop: - { model: sentiment } - { model: nlp_parser } - { model: recommender } - name: Configure model serving service template: src: """templates/ml-service-{{ item }}.service.j2""" dest: """/etc/systemd/system/ml-{{ item }}.service""" owner: root group: root mode: '0644' loop: - "{{ model_versions.keys() | list }}" notify: reload systemd - name: Ensure ML serving services are started and enabled systemd: name: """ml-{{ item }}.service""" state: started enabled: yes loop: - "{{ model_versions.keys() | list }}" ``` ### State Management IaC state tracks what you've created so subsequent runs know what exists. For local deployments, store IaC state files centrally—part of your infrastructure repository—not on individual devices. State files contain sensitive information (IPs, resource identifiers). Store them encrypted or in protected storage in your network. ---25 min
- 20Ansible for AIAnsible provides idempotent, repeatable configuration management for AI infrastructure. Its agentless architecture and declarative language make it suitable for managing ML serving environments across diverse hardware without requiring installed agents on target systems. ### Why Ansible for Local AI Local AI deployments often span heterogeneous environments—different hardware generations, OS versions, and runtime configurations. Ansible's push-based model connects from a control node to targets via SSH, requiring no software installation on managed hosts beyond an SSH server. Ansible tasks are idempotent: running the same task multiple times produces the same result as running it once. This property is essential for reliable infrastructure management where repeated execution is common. ### Inventory and Pattern-Based Management ```yaml # YAML: Ansible inventory for local AI infrastructure # File: inventory.ini [ml_serving] # Production inference servers prod-inference-01 ansible_host=10.0.1.10 ansible_user=mlops prod-inference-02 ansible_host=10.0.1.11 ansible_user=mlops prod-inference-03 ansible_host=10.0.1.12 ansible_user=mlops [ml_serving:vars] ansible_python_interpreter=/usr/bin/python3 model_registry_base=s3://models/internal/ monitoring_enabled=true [ml_training] # Training and experiment servers train-01 ansible_host=10.0.2.10 ansible_user=mlops train-02 ansible_host=10.0.2.11 ansible_user=mlops [edge_nodes] # Edge deployment nodes with limited resources edge-retail-01 ansible_host=192.168.1.101 ansible_user=pi edge-retail-02 ansible_host=192.168.1.102 ansible_user=pi [edge_nodes:vars] ansible_python_interpreter=/usr/bin/python3 # ARM-specific Python path model_registry_base=s3://models/internal-arm/ [ml_serving:children] ml_training # Training servers inherit serving group vars ``` ### ML-Specific Ansible Roles ```yaml # YAML: Ansible role for ML runtime installation # File: roles/ml-runtime/tasks/main.yml - name: Check for required architecture assert: that: - ansible_architecture in ['x86_64', 'aarch64', 'armv7l'] fail_msg: "Unsupported architecture: {{ ansible_architecture }}" - name: Install system dependencies (Debian/Ubuntu) apt: name: - python3 - python3-pip - python3-venv - git - wget - curl state: present update_cache: yes when: ansible_os_family == "Debian" - name: Install system dependencies (RHEL/CentOS) dnf: name: - python3 - python3-pip - git - wget - curl state: present when: ansible_os_family == "RedHat" - name: Create ML service user user: name: ml-service shell: /bin/false home: /opt/ml-service system: yes create_home: yes - name: Install CUDA Toolkit (GPU nodes only) block: - name: Add NVIDIA repository get_url: url: https://developer.download.nvidia.com/compute/cuda/repos/{{ ansible_distribution_file_variance | lower }}/{{ ansible_architecture }}/cuda-keyring_1.0-1_all.deb dest: /tmp/cuda-keyring.deb - name: Install CUDA keyring apt: deb: /tmp/cuda-keyring.deb - name: Install CUDA runtime apt: name: - cuda-runtime-12-1 - libcublas-12-1 state: present when: (ansible_local | default({})).cuda | default(false) ``` ### Model Deployment Playbook ```yaml # YAML: Complete model deployment playbook # File: deploy-ml-models.yml --- - name: Deploy ML Models to Production hosts: ml_serving gather_facts: yes become: yes vars: model_artifact_path: /opt/models serving_port: 8080 health_check_path: /health deployment_warmup_seconds: 30 pre_tasks: - name: Verify model versions are specified assert: that: - model_versions is defined - model_versions | length > 0 fail_msg: "model_versions variable must be defined" - name: Check connectivity to model registry uri: url: "https://models.internal/v1/health" method: GET status_code: 200 register: registry_check failed_when: false - name: Ensure deployment is approved assert: that: deployment_approved | default(false) | bool fail_msg: "deployment_approved must be set to true" when: enforcement_mode == "strict" tasks: - name: Create model deployment directory file: path: """{{ model_artifact_path }}/{{ item.key }}""" state: directory owner: ml-service group: ml-service mode: '0750' loop: """{{ model_versions | dict2items }}""" - name: Download model artifacts aws_s3: bucket: mlops-models object: """{{ item.value.model_path }}""" dest: """{{ model_artifact_path }}/{{ item.key }}/model.tar.gz""" mode: get overwrite: different loop: """{{ model_versions | dict2items }}""" register: download_results - name: Extract model artifacts unarchive: src: """{{ model_artifact_path }}/{{ item }}/model.tar.gz""" dest: """{{ model_artifact_path }}/{{ item }}/""" remote_src: yes creates: """{{ model_artifact_path }}/{{ item }}/model.pkl""" loop: """{{ model_versions.keys() | list }}""" - name: Generate serving configuration template: src: model_config.yaml.j2 dest: """{{ model_artifact_path }}/{{ item.key }}/config.yaml""" loop: """{{ model_versions.keys() | list }}""" notify: restart model service - name: Verify model loads correctly shell: | cd """{{ model_artifact_path }}/{{ model_name }}""" python3 -c "import joblib; joblib.load('model.pkl')" args: executable: /bin/bash loop: """{{ model_versions.keys() | list }}""" vars: model_name: """{{ item }}""" register: model_load_check failed_when: false - name: Assert model loading succeeded assert: that: item.rc == 0 fail_msg: """Model {{ item.invocation.module_call_args[0].loop_var }} failed to load""" loop: """{{ model_load_check.results }}""" when: model_load_check.results | length > 0 - name: Enable and start model services systemd: name: """ml-model-{{ item.key }}.service""" state: started enabled: yes loop: """{{ model_versions | dict2items }}""" - name: Wait for model endpoints to become healthy uri: url: "http://localhost:"""{{ serving_port }}/{{ health_check_path }}""" method: GET status_code: 200 register: health_check until: health_check.status == 200 retries: 10 delay: 5 loop: """{{ model_versions.keys() | list }}""" vars: serving_port: """{{ 8080 + (loop_index | int) }}""" - name: Record deployment postgresql_stmt: query: | INSERT INTO deployment_log (model_name, version, host, deployed_at, deployed_by) VALUES (%s, %s, %s, NOW(), %s) login_host: postgres.internal login_db: mlops login_user: ansible login_password: """{{ lookup('env', 'DB_PASSWORD') }}""" loop: """{{ model_versions | dict2items }}""" handlers: - name: restart model service systemd: name: """ml-model-{{ inventory_hostname }}.service""" state: restarted - name: reload systemd systemd: daemon_reload: yes post_tasks: - name: Verify all endpoints responding uri: url: "http://{{ ansible_host }}:8080/v1/models" method: GET status_code: 200 register: endpoint_verify - name: Display deployment summary debug: msg: | Deployment complete for {{ inventory_hostname }} Models: {{ model_versions.keys() | list }} Results: {{ download_results.results | map(attribute='dest') | list }} ``` ### Testing Ansible Deployments ```yaml # YAML: Ansible deployment testing with molecule # File: molecule/default/molecule.yml --- dependency: name: galaxy driver: name: docker platforms: - name: instance image: docker.io/pyiron/centos7-systemd-python pre_build_image: yes command: /usr/sbin/init privileged: true provisioner: name: ansible verifier: name: testinfra additional_files: - test_deploy.py scenario: name: default test_sequence: - destroy - create - prepare - converge - verify # Run test cases - destroy ``` ---30 min
- 21Monitoring and AlertingFull-spectrum monitoring for ML serving involves tracking infrastructure metrics, model performance metrics, and business outcome metrics. Effective alerting balances sensitivity (catching real issues) against noise (avoiding alert fatigue that causes teams to ignore warnings). ### Monitoring Architecture Local AI deployments require thoughtful monitoring architecture because you cannot rely on cloud-native observability platforms. Build a metrics collection stack appropriate to your infrastructure constraints. ```python # Python: Metrics collection infrastructure for ML serving from dataclasses import dataclass, field from typing import Optional import time import threading from collections import deque from enum import Enum class MetricType(Enum): COUNTER = "counter" # Monotonically increasing GAUGE = "gauge" # Point-in-time value HISTOGRAM = "histogram" # Distribution statistics SUMMARY = "summary" # Quantile-based statistics @dataclass class Metric: name: str value: float labels: dict timestamp: int metric_type: MetricType class LocalMetricsCollector: """ Lightweight metrics collector for ML serving. Stores metrics in memory with optional forwarding to Prometheus. """ def __init__(self, push_gateway_url: Optional[str] = None): self.metrics = {} # name -> deque of Metric self.push_gateway_url = push_gateway_url self.lock = threading.Lock() def record_counter(self, name: str, value: float = 1, **labels): self._record(MetricType.COUNTER, name, value, labels) def record_gauge(self, name: str, value: float, **labels): self._record(MetricType.GAUGE, name, value, labels) def record_histogram(self, name: str, value: float, **labels): self._record(MetricType.HISTOGRAM, name, value, labels) def _record(self, metric_type: MetricType, name: str, value: float, labels: dict): metric = Metric( name=name, value=value, labels=labels, timestamp=int(time.time()), metric_type=metric_type ) with self.lock: if name not in self.metrics: self.metrics[name] = deque(maxlen=10000) self.metrics[name].append(metric) def get_metrics(self, name: Optional[str] = None) -> list[Metric]: """Retrieve stored metrics.""" with self.lock: if name: return list(self.metrics.get(name, [])) return [m for metrics in self.metrics.values() for m in metrics] def compute_histogram_stats(self, name: str) -> dict: """Compute histogram statistics for bucketed metric.""" values = [m.value for m in self.metrics.get(name, [])] if not values: return {} sorted_values = sorted(values) n = len(sorted_values) return { "count": n, "sum": sum(values), "mean": sum(values) / n, "min": min(values), "max": max(values), "p50": sorted_values[int(n * 0.5)], "p95": sorted_values[int(n * 0.95)], "p99": sorted_values[int(n * 0.99)], } # Prometheus metrics exporter endpoint def prometheus_metrics_endpoint(collector: LocalMetricsCollector) -> str: """Format metrics for Prometheus scraping.""" metrics_output = [] for name, metrics in collector.metrics.items(): metric_types = {m.metric_type for m in metrics} for m in metrics: labels_str = ",".join( f'{k}="{v}"' for k, v in m.labels.items() ) if metric_types == {MetricType.GAUGE}: metric_type_str = "# TYPE {} gauge".format(name) elif metric_types == {MetricType.HISTOGRAM}: metric_type_str = "# TYPE {} histogram".format(name) else: metric_type_str = "# TYPE {} counter".format(name) metrics_output.append( "{}{{{}}} {}".format(name, labels_str, m.value) ) return "\n".join(metrics_output) # Usage: Integrate into model serving # from prometheus_client import Counter, Histogram, Gauge, start_http_server # # # Standard ML serving metrics # inference_latency = Histogram( # 'ml_inference_latency_seconds', # 'Inference latency in seconds', # ['model_name', 'model_version'] # ) # # prediction_confidence = Histogram( # 'ml_prediction_confidence', # 'Model prediction confidence scores', # ['model_name', 'model_version'] # ) # # requests_total = Counter( # 'ml_requests_total', # 'Total inference requests', # ['model_name', 'model_version', 'status'] # ) ``` ### Alerts Configuration ```yaml # YAML: Alert configuration for ML serving # File: alerts/ml-serving-alerts.yaml groups: - name: ml_serving_infrastructure interval: 30s rules: # Infrastructure alerts - alert: HighCPUUsage expr: cpu_usage_percent > 90 for: 5m labels: severity: warning team: ml-ops annotations: summary: "High CPU usage on {{ $labels.instance }}" description: "CPU usage at {{ $value }}% for 5+ minutes" - alert: OutOfMemory expr: memory_usage_percent > 95 for: 1m labels: severity: critical team: ml-ops annotations: summary: "Memory exhaustion imminent on {{ $labels.instance }}" - alert: ModelServiceDown expr: up{job="ml-serving"} == 0 for: 1m labels: severity: critical team: ml-ops annotations: summary: "Model serving service down on {{ $labels.instance }}" - name: ml_model_performance interval: 60s rules: # Model performance alerts - alert: HighInferenceLatency expr: histogram_quantile(0.95, ml_inference_latency_seconds) > 2.0 for: 10m labels: severity: warning team: ml-ops annotations: summary: "High inference latency for {{ $labels.model_name }}" description: "P95 latency at {{ $value }}s, threshold 2.0s" - alert: ModelConfidenceDrift expr: | avg(ml_prediction_confidence{model_name="sentiment"}) < avg(ml_prediction_confidence{model_name="sentiment"}) offset 7d for: 24h labels: severity: warning team: ml-ops annotations: summary: "Prediction confidence drift detected for {{ $labels.model_name }}" - alert: ExtensiveDataDrift expr: model_drift_score > 0.15 for: 1h labels: severity: warning team: ml-ops annotations: summary: "Significant data drift detected for {{ $labels.model_name }}" description: "Drift score {{ $value }} exceeds threshold 0.15" - alert: PredictionAccuracyDegradation expr: | (sum(prediction_correct{service="sentiment"}) / sum(prediction_total{service="sentiment"})) < 0.80 for: 1h labels: severity: critical team: ml-ops annotations: summary: "Model accuracy below threshold for {{ $labels.service }}" description: "Current accuracy {{ $value | printf \"%.2f\" }}, threshold 0.80" - name: ml_business_metrics interval: 300s rules: # Business outcome alerts - alert: LowUserSatisfaction expr: avg(user_feedback_score{service="inference"}) < 3.0 for: 1d labels: severity: warning team: ml-ops annotations: summary: "User satisfaction score below target for {{ $labels.service }}" ``` ### Dashboard Design Effective ML monitoring dashboards show what's happening (current state), what happened (trends), and what matters (impact on outcomes). Design dashboards for different audiences: operators need system health; stakeholders need business impact. Build dashboards around questions: Is the service up? Is it responding quickly? Are predictions accurate? Are users receiving value?25 min
- 22Model GovernanceModel governance encompasses the policies, procedures, and controls that ensure models operate responsibly, legally, and ethically. Governance addresses fairness, explainability, bias detection, and compliance—this is not optional for production ML systems. ### Governance Framework Components Effective model governance requires four interconnected components: **Model inventory**: A detailed registry tracking every deployed model, its version, owner, training data provenance, evaluation metrics, and deployment status. You cannot govern what you cannot see. **Lifecycle management**: Definitions of model stages (development, validation, staging, production, archived), promotion criteria for advancing between stages, and deprecation procedures for retiring models. **Risk assessment**: Evaluation of model impact on individuals or groups, required safeguards based on risk level, and documented approval chains. **Monitoring and audit**: Continuous surveillance for fairness metrics, drift detection, and audit trail maintenance for compliance. ### Model Registry Implementation ```python # Python: Model registry with governance metadata from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Optional import json import os from pathlib import Path class ModelStage(Enum): DEVELOPMENT = "development" VALIDATION = "validation" STAGING = "staging" PRODUCTION = "production" ARCHIVED = "archived" DEPRECATED = "deprecated" class RiskLevel(Enum): LOW = "low" # Minimal individual impact MEDIUM = "medium" # Some individual impact, limited scope HIGH = "high" # Significant individual impact CRITICAL = "critical" # Legal, financial, or fundamental rights impact @dataclass class ModelMetadata: """Complete governance metadata for a model.""" model_id: str name: str version: str model_type: str # classifier, regressor, etc. # Ownership owner_team: str owner_contact: str data_scientist: str # Training provenance training_data_id: str training_data_version: str training_data_hash: str # SHA256 of training data training_start_date: str training_end_date: str training_duration_hours: float # Configuration hyperparameters: dict features: list[str] feature_preprocessing: dict # Performance evaluation_metrics: dict # metric_name -> value evaluation_date: str evaluation_dataset_id: str # Governance stage: ModelStage risk_level: RiskLevel regulatory_context: Optional[str] # HIPAA, GDPR, etc. approval_status: str approved_by: list[str] approval_date: str # Fairness fairness_metrics: Optional[dict] protected_groups: Optional[list[str]] fairness_thresholds: Optional[dict] # Deployment deployment_date: Optional[str] serving_endpoint: Optional[str] traffic_percentage: float = 0.0 def to_dict(self) -> dict: return {k: v if not isinstance(v, Enum) else v.value for k, v in self.__dict__.items()} class ModelRegistry: """ File-based model registry with governance metadata. Production systems should use PostgreSQL or similar. """ def __init__(self, registry_path: str): self.registry_path = Path(registry_path) self.registry_path.mkdir(parents=True, exist_ok=True) self.models_file = self.registry_path / "models.json" self._ensure_registry_file() def _ensure_registry_file(self): if not self.models_file.exists(): self.models_file.write_text(json.dumps({"models": []})) def _load_registry(self) -> dict: return json.loads(self.models_file.read_text()) def _save_registry(self, registry: dict): self.models_file.write_text(json.dumps(registry, indent=2)) def register_model(self, metadata: ModelMetadata) -> str: """Register a new model version.""" registry = self._load_registry() # Check for existing version existing = [m for m in registry["models"] if m["model_id"] == metadata.model_id and m["version"] == metadata.version] if existing: raise ValueError(f"Model {metadata.model_id} v{metadata.version} already registered") # Add to registry registry["models"].append(metadata.to_dict()) self._save_registry(registry) return metadata.model_id def list_models(self, stage: Optional[ModelStage] = None) -> list[dict]: """List all models, optionally filtered by stage.""" registry = self._load_registry() models = registry["models"] if stage: models = [m for m in models if m["stage"] == stage.value] return models def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[dict]: """Retrieve model metadata.""" registry = self._load_registry() candidates = [m for m in registry["models"] if m["model_id"] == model_id] if not candidates: return None if version: matches = [m for m in candidates if m["version"] == version] return matches[0] if matches else None # Return latest by version sort return sorted(candidates, key=lambda m: m["version"])[-1] def promote_model( self, model_id: str, target_stage: ModelStage, approved_by: list[str] ) -> dict: """Promote a model to a new stage with approval.""" registry = self._load_registry() for model in registry["models"]: if model["model_id"] == model_id: model["stage"] = target_stage.value model["approval_status"] = "approved" model["approved_by"] = approved_by model["approval_date"] = datetime.now().isoformat() break self._save_registry(registry) return self.get_model(model_id) ``` ### Fairness Evaluation ```python # Python: Fairness evaluation for model governance import numpy as np from collections import defaultdict from typing import Optional class FairnessEvaluator: """ Evaluate model fairness across protected groups. Required for responsible governance. """ def __init__(self, protected_groups: list[str]): self.protected_groups = protected_groups def evaluate( self, predictions: np.ndarray, protected_attributes: dict[str, np.ndarray], outcome: Optional[np.ndarray] = None # Ground truth for some analyses ) -> dict: """ Compute fairness metrics across protected groups. Returns metrics for discrepancy detection. """ metrics = {} for group in self.protected_groups: if group not in protected_attributes: continue group_mask = protected_attributes[group] == 1 non_group_mask = ~group_mask metrics[group] = self._compute_group_metrics( predictions, group_mask, non_group_mask, outcome ) # Cross-group fairness summary metrics["disparity_summary"] = self._compute_disparity_summary(metrics) return metrics def _compute_group_metrics( self, predictions: np.ndarray, group_mask: np.ndarray, non_group_mask: np.ndarray, outcome: Optional[np.ndarray] ) -> dict: """Compute metrics for a single protected group.""" group_preds = predictions[group_mask] non_group_preds = predictions[non_group_mask] metrics = { "positive_rate_difference": float( np.mean(group_preds) - np.mean(non_group_preds) ), "selection_rate": float(np.mean(group_preds)), "sample_count": int(np.sum(group_mask)), } if outcome is not None: group_outcomes = outcome[group_mask] non_group_outcomes = outcome[non_group_mask] # Equal Opportunity: equal true positive rates group_tpr = np.mean(group_preds[group_outcomes == 1]) non_group_tpr = np.mean(non_group_preds[non_group_outcomes == 1]) metrics["equal_opportunity_diff"] = float(group_tpr - non_group_tpr) return metrics def _compute_disparity_summary(self, metrics: dict) -> dict: """Summarize maximum disparities across groups.""" positive_rate_diffs = [ m["positive_rate_difference"] for m in metrics.values() if "positive_rate_difference" in m ] return { "max_positive_rate_diff": float(max(positive_rate_diffs, key=abs)), "groups_flagged": [ group for group, m in metrics.items() if abs(m.get("positive_rate_difference", 0)) > 0.1 ] } ``` ### Governance Workflows Governance is only effective if enforced through workflow. Every model promotion requires documented approval chains. High-risk models require documented review by legal, compliance, and ethics stakeholders. Governance workflows must be integrated into your CI/CD pipeline.25 min
- 23Audit TrailsAudit trails provide forensic accountability for ML systems—what data was used, what models were trained, who approved deployment, and what predictions were made. Complete audit trails are non-negotiable for regulated industries and essential for post-incident investigation. ### Audit Trail Requirements Audit trails serve multiple purposes: regulatory compliance, incident investigation, bias investigation, and operational improvement. Each purpose requires different data granularity. **Regulatory compliance** typically requires immutable logs of data access, model versions served, and decision records for affected individuals. GDPR's right to explanation requires knowing which model made specific predictions. HIPAA requires knowing what training data contained protected health information. **Incident investigation** requires reconstructing exact conditions at incident time—model version, input data distribution, serving configuration, and performance metrics. **Bias investigation** requires tracking model behavior across demographic groups over time, enabling identification of when discriminatory patterns emerged. ### Audit Log Implementation ```python # Python: Immutable audit logging for ML systems import hashlib import json import sqlite3 from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import Optional, Any import threading class AuditLogType: DATA_ACCESS = "data_access" MODEL_TRAINING = "model_training" MODEL_EVALUATION = "model_evaluation" MODEL_PROMOTION = "model_promotion" MODEL_DEPLOYMENT = "model_deployment" INFERENCE_REQUEST = "inference_request" INFERENCE_RESPONSE = "inference_response" CONFIGURATION_CHANGE = "configuration_change" @dataclass class AuditEntry: """Single audit log entry.""" timestamp: str entry_id: str # UUID or sequential event_type: str actor: str # User, system process, automated pipeline resource_type: str # model, dataset, configuration resource_id: str # Versioned identifier action: str # read, write, train, deploy details: dict # Additional context checksum: str # Integrity verification def __post_init__(self): # Auto-generate checksum for integrity verification content = json.dumps(asdict(self), sort_keys=True).encode() # Exclude checksum itself from hashing content_for_hash = json.dumps( {k: v for k, v in asdict(self).items() if k != "checksum"}, sort_keys=True ).encode() self.checksum = hashlib.sha256(content_for_hash).hexdigest()[:16] class ImmutableAuditLog: """ Append-only audit log with integrity verification. Designed for compliance and forensic purposes. """ def __init__(self, db_path: str): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() self._init_db() def _init_db(self): """Initialize database with write-once semantics.""" conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS audit_log ( entry_id TEXT PRIMARY KEY, timestamp TEXT NOT NULL, event_type TEXT NOT NULL, actor TEXT NOT NULL, resource_type TEXT NOT NULL, resource_id TEXT NOT NULL, action TEXT NOT NULL, details TEXT NOT NULL, checksum TEXT NOT NULL, written_at TEXT NOT NULL ) """) # Create index for efficient querying cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_audit_time ON audit_log(timestamp) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_audit_resource ON audit_log(resource_type, resource_id) """) # Enable WAL mode for concurrent reads during writes cursor.execute("PRAGMA journal_mode=WAL") conn.commit() conn.close() def append(self, entry: AuditEntry): """Append an immutable audit entry.""" with self._lock: conn = sqlite3.connect(str(self.db_path)) try: cursor = conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO audit_log (entry_id, timestamp, event_type, actor, resource_type, resource_id, action, details, checksum, written_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.entry_id, entry.timestamp, entry.event_type, entry.actor, entry.resource_type, entry.resource_id, entry.action, json.dumps(entry.details), entry.checksum, datetime.now().isoformat() )) conn.commit() finally: conn.close() def query( self, resource_type: Optional[str] = None, resource_id: Optional[str] = None, event_type: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, limit: int = 1000 ) -> list[dict]: """Query audit log entries.""" conditions = [] params = [] if resource_type: conditions.append("resource_type = ?") params.append(resource_type) if resource_id: conditions.append("resource_id = ?") params.append(resource_id) if event_type: conditions.append("event_type = ?") params.append(event_type) if start_time: conditions.append("timestamp >= ?") params.append(start_time) if end_time: conditions.append("timestamp <= ?") params.append(end_time) query = "SELECT * FROM audit_log" if conditions: query += " WHERE " + " AND ".join(conditions) query += f" ORDER BY timestamp DESC LIMIT {limit}" conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute(query, params) rows = cursor.fetchall() conn.close() columns = ["entry_id", "timestamp", "event_type", "actor", "resource_type", "resource_id", "action", "details", "checksum", "written_at"] return [dict(zip(columns, row)) for row in rows] def verify_integrity(self) -> tuple[bool, list[str]]: """Verify checksums for all entries.""" conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute("SELECT * FROM audit_log") rows = cursor.fetchall() conn.close() columns = ["entry_id", "timestamp", "event_type", "actor", "resource_type", "resource_id", "action", "details", "checksum", "written_at"] corrupted = [] for row in rows: record = dict(zip(columns, row)) temp_entry = AuditEntry( timestamp=record["timestamp"], entry_id=record["entry_id"], event_type=record["event_type"], actor=record["actor"], resource_type=record["resource_type"], resource_id=record["resource_id"], action=record["action"], details=json.loads(record["details"]), checksum=record["checksum"] # Expected from stored ) # Verify content_for_hash = json.dumps( {k: v for k, v in asdict(temp_entry).items() if k != "checksum"}, sort_keys=True ).encode() computed = hashlib.sha256(content_for_hash).hexdigest()[:16] if computed != record["checksum"]: corrupted.append(record["entry_id"]) return len(corrupted) == 0, corrupted # Usage: Audit logging in serving context def log_inference_request( audit_log: ImmutableAuditLog, request_id: str, model_id: str, model_version: str, input_features: dict, actor: str = "user_system" ): """Log an inference request for audit purposes.""" entry = AuditEntry( timestamp=datetime.now().isoformat(), entry_id=request_id, event_type=AuditLogType.INFERENCE_REQUEST, actor=actor, resource_type="model", resource_id=f"{model_id}:{model_version}", action="predict", details={ "input_feature_hash": hashlib.sha256( json.dumps(input_features, sort_keys=True).encode() ).hexdigest()[:16] # Do not log raw input—may contain PII } ) audit_log.append(entry) ``` ### What Not to Log Audit trails sometimes over-collect, logging sensitive information that itself becomes a liability. Do not log raw personal data in audit entries—log hashes or aggregate indicators for correlation purposes. Avoid logging information that would expose your model architecture or proprietary processing logic to potential attackers. ### Retention and Rotation Audit logs have mass. Establish retention policies based on regulatory requirements—financial services may require 7+ years; general applications may require 90 days. Implement log rotation with appropriate archival before deletion.20 min
- 24MLOps Pipeline ProjectThis capstone project integrates previous chapters into a production-ready MLOps pipeline. You'll implement drift detection, CI/CD, GitOps, monitoring, governance, and audit trails—a complete, operational MLOps system for local AI. ### Project Overview Build a complete MLOps pipeline for a sentiment analysis model serving local inference requests. The system will: 1. **Detect data and model drift** in real-time during serving 2. **Automate training and evaluation** through pipeline as code 3. **Manage deployments** using GitOps principles 4. **Monitor performance** with alerts and dashboards 5. **Govern responsibly** with fairness evaluation and audit trails 6. **Retain accountability** through detailed logging ### Architecture ``` ┌─────────────────────────────┐ │ Git Repository │ │ (Pipeline as Code) │ │ (Model Configurations) │ │ (Deployment Manifests) │ └──────────────┬──────────────┘ │ ┌──────────────▼──────────────┐ │ CI/CD Pipeline │ │ (GitHub Actions/GitLab CI) │ │ - Data Validation │ │ - Model Training │ │ - Evaluation │ │ - Artifact Registration │ └──────────────┬──────────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ │ │ ┌────────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐ │ Production │ │ Staging │ │ Development │ │ (Edge Nodes) │ │ (Staging) │ │ (Experiment) │ └────────┬────────┘ └─────────────────┘ └─────────────────┘ │ ┌────────▼────────┐ │ Monitoring │ │ Stack │ │ - Prometheus │ │ - Grafana │ │ - Alertmanager │ │ - Audit Log DB │ └─────────────────┘ ``` ### Implementation Steps **Step 1: Data Validation Pipeline** ```python # Python: data_validation.py import pandas as pd import numpy as np import hashlib from dataclasses import dataclass from typing import Optional from scipy.stats import ks_2samp @dataclass class ValidationResult: passed: bool checks_performed: int checks_passed: int failures: list[dict] class DataValidator: """ Validates training data against schema and distribution expectations. Integrated into CI/CD pipeline. """ def __init__(self, reference_stats: dict): self.reference_stats = reference_stats def validate( self, data: pd.DataFrame, schema: dict, drift_threshold: float = 0.1 ) -> ValidationResult: results = ValidationResult( passed=True, checks_performed=0, checks_passed=0, failures=[] ) # Schema validation results.checks_performed += 1 expected_columns = set(schema["columns"]) actual_columns = set(data.columns) if expected_columns != actual_columns: results.passed = False results.failures.append({ "check": "schema", "expected": list(expected_columns), "actual": list(actual_columns), "missing": list(expected_columns - actual_columns), "extra": list(actual_columns - expected_columns) }) else: results.checks_passed += 1 # Distribution validation for col, ref_stats in self.reference_stats.items(): if col not in data.columns: continue results.checks_performed += 1 if pd.api.types.is_numeric_dtype(data[col]): statistic, p_value = ks_2samp( data[col].dropna(), ref_stats["samples"] ) if statistic > drift_threshold: results.passed = False results.failures.append({ "check": "distribution_drift", "column": col, "ks_statistic": float(statistic), "threshold": drift_threshold }) else: results.checks_passed += 1 # Null value checks null_threshold = schema.get("null_threshold", 0.0) for col in schema["columns"]: if col not in data.columns: continue results.checks_performed += 1 null_rate = data[col].isna().mean() if null_rate > null_threshold: results.passed = False results.failures.append({ "check": "null_rate", "column": col, "null_rate": float(null_rate), "threshold": null_threshold }) else: results.checks_passed += 1 return results ``` **Step 2: Training Pipeline** ```python # Python: training_pipeline.py import json from pathlib import Path from datetime import datetime import hashlib class TrainingPipeline: """ Declarative training pipeline as code. Defines all stages, configurations, and validation gates. """ def __init__(self, config: dict): self.config = config self.git_info = self._capture_git_info() def _capture_git_info(self) -> dict: import subprocess try: commit = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True ).stdout.strip() branch = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True ).stdout.strip() return {"commit": commit, "branch": branch} except: return {"commit": "unknown", "branch": "unknown"} def run(self) -> dict: """ Execute training pipeline stages. Returns execution metadata for governance. """ execution_id = hashlib.sha256( f"{datetime.now().isoformat()}{self.git_info}".encode() ).hexdigest()[:12] results = { "execution_id": execution_id, "pipeline": "sentiment_training", "git_info": self.git_info, "stages": {}, "status": "running", "start_time": datetime.now().isoformat() } try: # Stage 1: Data extraction and validation data = self._extract_data() validator = self._validate_data(data) if not validator.passed: raise ValueError(f"Data validation failed: {validator.failures}") results["stages"]["validation"] = "passed" # Stage 2: Training model = self._train(data) results["stages"]["training"] = "completed" # Stage 3: Evaluation metrics = self._evaluate(model, data) results["stages"]["evaluation"] = metrics results["metrics"] = metrics if not self._passes_thresholds(metrics): raise ValueError(f"Evaluation thresholds not met: {metrics}") # Stage 4: Registration model_path = self._register_model(model, metrics) results["stages"]["registration"] = model_path results["model_path"] = model_path results["status"] = "success" results["end_time"] = datetime.now().isoformat() except Exception as e: results["status"] = "failed" results["error"] = str(e) results["end_time"] = datetime.now().isoformat() return results def _extract_data(self): """Placeholder for data extraction.""" pass def _validate_data(self, data): """Placeholder for data validation.""" pass def _train(self, data): """Placeholder for model training.""" pass def _evaluate(self, model, data): """Placeholder for model evaluation.""" pass def _register_model(self, model, metrics): """Placeholder for model registration.""" pass def _passes_thresholds(self, metrics: dict) -> bool: thresholds = self.config.get("evaluation_thresholds", {}) for metric, threshold in thresholds.items(): if metric.startswith(">"): actual_metric = metric[1:] if metrics.get(actual_metric, 0) <= threshold: return False elif metric.startswith("<"): actual_metric = metric[1:] if metrics.get(actual_metric, float('inf')) >= threshold: return False return True ``` **Step 3: Monitoring and Alerting** ```yaml # YAML: Alert rules for production monitoring # File: monitoring/alerts/ml-sentiment-alerts.yaml groups: - name: ml_serving_alerts rules: - alert: SentimentModelAccuracyDegraded expr: | (sum(inference_feedback{correct="true"} offset 1d) / sum(inference_feedback offset 1d)) < 0.82 for: 2h labels: severity: critical team: ml-ops model: sentiment-v2 annotations: summary: "Sentiment model accuracy drops below production threshold" description: "Current accuracy {{ $value | printf \"%.2f\" }} < 0.82" - alert: SentimentDataDrift expr: sentiment_input_drift_score{quantile="0.95"} > 0.15 for: 30m labels: severity: warning annotations: summary: "Significant input distribution drift for sentiment model" - alert: SentimentLatencySLOBreach expr: | histogram_quantile(0.99, rate(ml_inference_duration_seconds_bucket{model="sentiment"}[5m])) > 0.500 for: 5m labels: severity: critical annotations: summary: "P99 latency exceeds 500ms SLO" ``` **Step 4: Governance Integration** ```python # Python: governance.py - Complete governance workflow from datetime import datetime class GovernanceWorkflow: """ Complete governance integration for ML pipeline. Enforces approval gates and builds audit trails. """ def __init__(self, audit_log, model_registry): self.audit_log = audit_log self.model_registry = model_registry def approve_and_deploy( self, model_id: str, target_environment: str, approvers: list[str], fairness_results: dict ) -> dict: """ Execute governance approval workflow. - Verify fairness criteria - Collect required approvals - Register approval in audit log - Trigger deployment """ from uuid import uuid4 # Fairness gate max_disparity = abs(fairness_results.get("max_positive_rate_diff", 0)) if max_disparity > 0.15: return { "status": "rejected", "reason": f"Fairness threshold exceeded: disparity={max_disparity}" } # Approval gate required_approvers = ["ml-ops-lead", "business-stakeholder"] missing_approvers = set(required_approvers) - set(approvers) if missing_approvers: return { "status": "pending_approval", "missing_approvers": list(missing_approvers) } # All gates passed—record in audit and deploy entry = AuditEntry( timestamp=datetime.now().isoformat(), entry_id=str(uuid4()), event_type="model_promotion", actor=", ".join(approvers), resource_type="model", resource_id=model_id, action="promote_to_production", details={ "target_environment": target_environment, "approvers": approvers, "fairness_passed": True, "max_disparity": max_disparity } ) self.audit_log.append(entry) return { "status": "approved", "approved_by": approvers, "audit_entry_id": entry.entry_id } ``` ### Validation Checklist Complete the following to validate your pipeline: ``` [ ] Pipeline executes data validation and fails on schema violations [ ] Training produces model artifacts with versioned metadata [ ] Evaluation metrics published to monitoring stack [ ] Drift detection monitors and alerts on distribution shifts [ ] Model promotion requires approval through governance workflow [ ] Audit log records all model lifecycle events [ ] Rollback procedure documented and tested [ ] Monitoring dashboard shows live model metrics [ ] Alerting triggers tested with synthetic failures [ ] Fairness evaluation integrated into evaluation stage ```30 min