14. Alerting Systems
Chapter 14 of 18 · 25 min
Effective alerting transforms monitoring data into actionable notifications. A well-designed system handles alert creation, routing, deduplication, and escalation without generating noise.
Alert Creation and Routing
# alerting_system.py
import json
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import ollama
class Severity(Enum):
CRITICAL = "critical"
WARNING = "warning"
INFO = "info"
@dataclass
class Alert:
alert_id: str
source: str
metric: str
value: float
threshold: float
severity: Severity
message: str
created_at: datetime
acknowledged: bool = False
acknowledged_by: Optional[str] = None
resolved_at: Optional[datetime] = None
class AlertingSystem:
def __init__(self, config_path: str = "alert_config.json"):
self.config = self._load_config(config_path)
self.alert_store = Path(self.config.get("alert_store", "alerts.jsonl"))
self.active_alerts: dict[str, Alert] = {}
self.suppression_rules = self._load_suppression_rules()
def _load_config(self, path: str) -> dict:
with open(path) as f:
return json.load(f)
def _load_suppression_rules(self) -> list[dict]:
"""Load alert suppression rules to prevent notification storms."""
return self.config.get("suppression_rules", [])
def evaluate_condition(self, metric: str, value: float,
conditions: list[dict]) -> list[Alert]:
"""Evaluate metrics against alert conditions."""
alerts = []
for condition in conditions:
if condition["metric"] != metric:
continue
# Check threshold
operator = condition.get("operator", ">")
threshold = condition["threshold"]
triggered = False
if operator == ">" and value > threshold:
triggered = True
elif operator == "<" and value < threshold:
triggered = True
elif operator == ">=" and value >= threshold:
triggered = True
elif operator == "<=" and value <= threshold:
triggered = True
elif operator == "==" and value == threshold:
triggered = True
if triggered:
alert = self._create_alert(condition, value)
# Check suppression
if not self._is_suppressed(alert):
alerts.append(alert)
return alerts
def _create_alert(self, condition: dict, current_value: float) -> Alert:
"""Create a new alert with AI-generated message."""
alert_id = f"{condition['metric']}_{int(datetime.now().timestamp())}"
# Generate contextual message using AI
message = self._generate_alert_message(condition, current_value)
alert = Alert(
alert_id=alert_id,
source=condition.get("source", "unknown"),
metric=condition["metric"],
value=current_value,
threshold=condition["threshold"],
severity=Severity(condition.get("severity", "warning")),
message=message,
created_at=datetime.now()
)
return alert
def _generate_alert_message(self, condition: dict, value: float) -> str:
"""Use local AI to generate contextual alert messages."""
prompt = f"""Generate a concise alert message for:
Metric: {condition['metric']}
Current value: {value}
Threshold: {condition['threshold']}
Operator: {condition.get('operator', '>')}
Context: {condition.get('context', 'System metric')}
Return ONLY the alert message in <50 characters. Start with action verb."""
response = ollama.chat(
model=self.config.get("model", "llama3"),
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0.3, "num_predict": 50}
)
return response["message"]["content"].strip()
def _is_suppressed(self, alert: Alert) -> bool:
"""Check if alert matches suppression rules."""
for rule in self.suppression_rules:
if rule.get("metric") == alert.metric:
# Check time-based suppression
if "quiet_hours_start" in rule:
now = datetime.now().time()
start = datetime.strptime(rule["quiet_hours_start"], "%H:%M").time()
end = datetime.strptime(rule["quiet_hours_end"], "%H:%M").time()
if start <= now <= end:
return True
# Check maintenance window
if rule.get("maintenance_mode"):
return True
return False
def send_alert(self, alert: Alert):
"""Route alert to appropriate notification channels."""
# Store alert
self.active_alerts[alert.alert_id] = alert
self._persist_alert(alert)
# Determine routing
channels = self._get_notification_channels(alert)
for channel in channels:
self._send_to_channel(alert, channel)
def _get_notification_channels(self, alert: Alert) -> list[dict]:
"""Determine notification channels based on severity and rules."""
channels = []
# Always notify default channel
if "default_channel" in self.config:
channels.append(self.config["default_channel"])
# Critical alerts get additional channels
if alert.severity == Severity.CRITICAL:
if "critical_channel" in self.config:
channels.append(self.config["critical_channel"])
# Metric-specific routing
for routing_rule in self.config.get("routing_rules", []):
if routing_rule.get("metric") == alert.metric:
channels.append(routing_rule["channel"])
return channels
def _send_to_channel(self, alert: Alert, channel: dict):
"""Send alert to specific notification channel."""
channel_type = channel.get("type")
if channel_type == "webhook":
self._send_webhook(alert, channel)
elif channel_type == "email":
self._send_email(alert, channel)
elif channel_type == "pagerduty":
self._send_pagerduty(alert, channel)
elif channel_type == "slack":
self._send_slack(alert, channel)
def _send_webhook(self, alert: Alert, channel: dict):
"""Send alert to webhook endpoint."""
import requests
payload = {
"alert_id": alert.alert_id,
"severity": alert.severity.value,
"metric": alert.metric,
"value": alert.value,
"message": alert.message,
"timestamp": alert.created_at.isoformat()
}
try:
response = requests.post(
channel["url"],
json=payload,
timeout=10
)
response.raise_for_status()
except Exception as e:
print(f"Webhook failed: {e}")
def _send_slack(self, alert: Alert, channel: dict):
"""Send alert to Slack."""
import requests
severity_emoji = {
"critical": "🔴",
"warning": "⚠️",
"info": "ℹ️"
}
payload = {
"channel": channel.get("channel", "#alerts"),
"text": f"{severity_emoji.get(alert.severity.value, '⚠️')} *{alert.metric}*",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{alert.severity.value.upper()}:* {alert.message}"
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Value: {alert.value} | Threshold: {alert.threshold} | {alert.source}"
}
]
}
]
}
requests.post(channel["webhook_url"], json=payload)
def _persist_alert(self, alert: Alert):
"""Store alert in persistent log."""
with open(self.alert_store, "a") as f:
f.write(json.dumps({
"alert_id": alert.alert_id,
"source": alert.source,
"metric": alert.metric,
"value": alert.value,
"threshold": alert.threshold,
"severity": alert.severity.value,
"message": alert.message,
"created_at": alert.created_at.isoformat(),
"acknowledged": alert.acknowledged,
"resolved_at": alert.resolved_at.isoformat() if alert.resolved_at else None
}) + "\n")
Alert Deduplication and Correlation
def deduplicate(self, new_alert: Alert) -> bool:
"""Check if alert is duplicate of existing active alert."""
for existing in self.active_alerts.values():
if (existing.metric == new_alert.metric and
existing.severity == new_alert.severity):
# Update existing instead of creating new
existing.value = new_alert.value
existing.created_at = datetime.now()
return True
return False
def correlate_alerts(self) -> list[list[Alert]]:
"""Group related alerts that likely share a root cause."""
correlations = []
processed = set()
for alert in self.active_alerts.values():
if alert.alert_id in processed:
continue
related = [alert]
processed.add(alert.alert_id)
for other in self.active_alerts.values():
if other.alert_id in processed:
continue
if self._are_related(alert, other):
related.append(other)
processed.add(other.alert_id)
if len(related) > 1:
correlations.append(related)
return correlations
def _are_related(self, alert1: Alert, alert2: Alert) -> bool:
"""Determine if two alerts are related."""
# Same source within time window
if alert1.source == alert2.source:
time_diff = abs((alert1.created_at - alert2.created_at).total_seconds())
if time_diff < 300: # 5 minutes
return True
# Use AI to determine semantic correlation
if self.config.get("ai_correlation"):
return self._ai_correlate(alert1, alert2)
return False
def _ai_correlate(self, alert1: Alert, alert2: Alert) -> bool:
"""Use AI to determine if alerts are related."""
prompt = f"""Are these two alerts related? Reply YES or NO.
Alert 1: {alert1.message}
Alert 2: {alert2.message}
"""
response = ollama.chat(
model=self.config.get("model", "llama3"),
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0, "num_predict": 3}
)
return "YES" in response["message"]["content"].upper()
EXERCISE
Create an alerting system that detects when error rates increase and automatically creates a PagerDuty incident for critical alerts, sends Slack notifications for warnings, and logs info-level alerts to a file.