14. Alerting Systems

Chapter 14 of 18 · 25 min

Effective alerting transforms monitoring data into actionable notifications. A well-designed system handles alert creation, routing, deduplication, and escalation without generating noise.

Alert Creation and Routing

# alerting_system.py
import json
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import ollama

class Severity(Enum):
    CRITICAL = "critical"
    WARNING = "warning"
    INFO = "info"

@dataclass
class Alert:
    alert_id: str
    source: str
    metric: str
    value: float
    threshold: float
    severity: Severity
    message: str
    created_at: datetime
    acknowledged: bool = False
    acknowledged_by: Optional[str] = None
    resolved_at: Optional[datetime] = None

class AlertingSystem:
    def __init__(self, config_path: str = "alert_config.json"):
        self.config = self._load_config(config_path)
        self.alert_store = Path(self.config.get("alert_store", "alerts.jsonl"))
        self.active_alerts: dict[str, Alert] = {}
        self.suppression_rules = self._load_suppression_rules()
    
    def _load_config(self, path: str) -> dict:
        with open(path) as f:
            return json.load(f)
    
    def _load_suppression_rules(self) -> list[dict]:
        """Load alert suppression rules to prevent notification storms."""
        return self.config.get("suppression_rules", [])
    
    def evaluate_condition(self, metric: str, value: float, 
                          conditions: list[dict]) -> list[Alert]:
        """Evaluate metrics against alert conditions."""
        alerts = []
        
        for condition in conditions:
            if condition["metric"] != metric:
                continue
            
            # Check threshold
            operator = condition.get("operator", ">")
            threshold = condition["threshold"]
            
            triggered = False
            if operator == ">" and value > threshold:
                triggered = True
            elif operator == "<" and value < threshold:
                triggered = True
            elif operator == ">=" and value >= threshold:
                triggered = True
            elif operator == "<=" and value <= threshold:
                triggered = True
            elif operator == "==" and value == threshold:
                triggered = True
            
            if triggered:
                alert = self._create_alert(condition, value)
                
                # Check suppression
                if not self._is_suppressed(alert):
                    alerts.append(alert)
        
        return alerts
    
    def _create_alert(self, condition: dict, current_value: float) -> Alert:
        """Create a new alert with AI-generated message."""
        alert_id = f"{condition['metric']}_{int(datetime.now().timestamp())}"
        
        # Generate contextual message using AI
        message = self._generate_alert_message(condition, current_value)
        
        alert = Alert(
            alert_id=alert_id,
            source=condition.get("source", "unknown"),
            metric=condition["metric"],
            value=current_value,
            threshold=condition["threshold"],
            severity=Severity(condition.get("severity", "warning")),
            message=message,
            created_at=datetime.now()
        )
        
        return alert
    
    def _generate_alert_message(self, condition: dict, value: float) -> str:
        """Use local AI to generate contextual alert messages."""
        prompt = f"""Generate a concise alert message for:
Metric: {condition['metric']}
Current value: {value}
Threshold: {condition['threshold']}
Operator: {condition.get('operator', '>')}

Context: {condition.get('context', 'System metric')}

Return ONLY the alert message in <50 characters. Start with action verb."""
        
        response = ollama.chat(
            model=self.config.get("model", "llama3"),
            messages=[{"role": "user", "content": prompt}],
            options={"temperature": 0.3, "num_predict": 50}
        )
        
        return response["message"]["content"].strip()
    
    def _is_suppressed(self, alert: Alert) -> bool:
        """Check if alert matches suppression rules."""
        for rule in self.suppression_rules:
            if rule.get("metric") == alert.metric:
                # Check time-based suppression
                if "quiet_hours_start" in rule:
                    now = datetime.now().time()
                    start = datetime.strptime(rule["quiet_hours_start"], "%H:%M").time()
                    end = datetime.strptime(rule["quiet_hours_end"], "%H:%M").time()
                    if start <= now <= end:
                        return True
                
                # Check maintenance window
                if rule.get("maintenance_mode"):
                    return True
        
        return False
    
    def send_alert(self, alert: Alert):
        """Route alert to appropriate notification channels."""
        # Store alert
        self.active_alerts[alert.alert_id] = alert
        self._persist_alert(alert)
        
        # Determine routing
        channels = self._get_notification_channels(alert)
        
        for channel in channels:
            self._send_to_channel(alert, channel)
    
    def _get_notification_channels(self, alert: Alert) -> list[dict]:
        """Determine notification channels based on severity and rules."""
        channels = []
        
        # Always notify default channel
        if "default_channel" in self.config:
            channels.append(self.config["default_channel"])
        
        # Critical alerts get additional channels
        if alert.severity == Severity.CRITICAL:
            if "critical_channel" in self.config:
                channels.append(self.config["critical_channel"])
        
        # Metric-specific routing
        for routing_rule in self.config.get("routing_rules", []):
            if routing_rule.get("metric") == alert.metric:
                channels.append(routing_rule["channel"])
        
        return channels
    
    def _send_to_channel(self, alert: Alert, channel: dict):
        """Send alert to specific notification channel."""
        channel_type = channel.get("type")
        
        if channel_type == "webhook":
            self._send_webhook(alert, channel)
        elif channel_type == "email":
            self._send_email(alert, channel)
        elif channel_type == "pagerduty":
            self._send_pagerduty(alert, channel)
        elif channel_type == "slack":
            self._send_slack(alert, channel)
    
    def _send_webhook(self, alert: Alert, channel: dict):
        """Send alert to webhook endpoint."""
        import requests
        
        payload = {
            "alert_id": alert.alert_id,
            "severity": alert.severity.value,
            "metric": alert.metric,
            "value": alert.value,
            "message": alert.message,
            "timestamp": alert.created_at.isoformat()
        }
        
        try:
            response = requests.post(
                channel["url"],
                json=payload,
                timeout=10
            )
            response.raise_for_status()
        except Exception as e:
            print(f"Webhook failed: {e}")
    
    def _send_slack(self, alert: Alert, channel: dict):
        """Send alert to Slack."""
        import requests
        
        severity_emoji = {
            "critical": "🔴",
            "warning": "⚠️",
            "info": "ℹ️"
        }
        
        payload = {
            "channel": channel.get("channel", "#alerts"),
            "text": f"{severity_emoji.get(alert.severity.value, '⚠️')} *{alert.metric}*",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*{alert.severity.value.upper()}:* {alert.message}"
                    }
                },
                {
                    "type": "context",
                    "elements": [
                        {
                            "type": "mrkdwn",
                            "text": f"Value: {alert.value} | Threshold: {alert.threshold} | {alert.source}"
                        }
                    ]
                }
            ]
        }
        
        requests.post(channel["webhook_url"], json=payload)
    
    def _persist_alert(self, alert: Alert):
        """Store alert in persistent log."""
        with open(self.alert_store, "a") as f:
            f.write(json.dumps({
                "alert_id": alert.alert_id,
                "source": alert.source,
                "metric": alert.metric,
                "value": alert.value,
                "threshold": alert.threshold,
                "severity": alert.severity.value,
                "message": alert.message,
                "created_at": alert.created_at.isoformat(),
                "acknowledged": alert.acknowledged,
                "resolved_at": alert.resolved_at.isoformat() if alert.resolved_at else None
            }) + "\n")

Alert Deduplication and Correlation

    def deduplicate(self, new_alert: Alert) -> bool:
        """Check if alert is duplicate of existing active alert."""
        for existing in self.active_alerts.values():
            if (existing.metric == new_alert.metric and
                existing.severity == new_alert.severity):
                # Update existing instead of creating new
                existing.value = new_alert.value
                existing.created_at = datetime.now()
                return True
        
        return False
    
    def correlate_alerts(self) -> list[list[Alert]]:
        """Group related alerts that likely share a root cause."""
        correlations = []
        processed = set()
        
        for alert in self.active_alerts.values():
            if alert.alert_id in processed:
                continue
            
            related = [alert]
            processed.add(alert.alert_id)
            
            for other in self.active_alerts.values():
                if other.alert_id in processed:
                    continue
                
                if self._are_related(alert, other):
                    related.append(other)
                    processed.add(other.alert_id)
            
            if len(related) > 1:
                correlations.append(related)
        
        return correlations
    
    def _are_related(self, alert1: Alert, alert2: Alert) -> bool:
        """Determine if two alerts are related."""
        # Same source within time window
        if alert1.source == alert2.source:
            time_diff = abs((alert1.created_at - alert2.created_at).total_seconds())
            if time_diff < 300:  # 5 minutes
                return True
        
        # Use AI to determine semantic correlation
        if self.config.get("ai_correlation"):
            return self._ai_correlate(alert1, alert2)
        
        return False
    
    def _ai_correlate(self, alert1: Alert, alert2: Alert) -> bool:
        """Use AI to determine if alerts are related."""
        prompt = f"""Are these two alerts related? Reply YES or NO.

Alert 1: {alert1.message}
Alert 2: {alert2.message}
"""
        
        response = ollama.chat(
            model=self.config.get("model", "llama3"),
            messages=[{"role": "user", "content": prompt}],
            options={"temperature": 0, "num_predict": 3}
        )
        
        return "YES" in response["message"]["content"].upper()
EXERCISE

Create an alerting system that detects when error rates increase and automatically creates a PagerDuty incident for critical alerts, sends Slack notifications for warnings, and logs info-level alerts to a file.