13. Anomaly Detection

Chapter 13 of 18 · 20 min

Anomaly detection flags unusual observations that deviate from expected patterns. Local models enable private, customized anomaly detection without data leaving your infrastructure.

Z-Score Detection

Simple but effective for normally distributed data:

def zscore_anomaly(series, threshold=3):
    mean = series.mean()
    std = series.std()
    z_scores = (series - mean) / std
    return pd.Series(np.abs(z_scores) > threshold, index=series.index)

df['anomaly_zscore'] = zscore_anomaly(df['metric'], threshold=3)
anomalies_zscore = df[df['anomaly_zscore']]

IQR-Based Detection

reliable against outliers:

def iqr_anomaly(series, multiplier=1.5):
    q1 = series.quantile(0.25)
    q3 = series.quantile(0.75)
    iqr = q3 - q1
    lower = q1 - multiplier * iqr
    upper = q3 + multiplier * iqr
    return (series < lower) | (series > upper)

df['anomaly_iqr'] = iqr_anomaly(df['metric'])

Isolation Forest with Local Model

Isolation Forest excels at multivariate anomalies:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

# Prepare features
features = df[['metric1', 'metric2', 'metric3']].dropna()
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)

# Train isolation forest
iso_forest = IsolationForest(contamination=0.01, random_state=42, n_estimators=100)
df['anomaly_iforest'] = iso_forest.fit_predict(scaled_features)

# -1 = anomaly, 1 = normal
df['is_anomaly'] = df['anomaly_iforest'] == -1

Using Ollama for Contextual Anomaly Analysis

Use local LLMs to explain why anomalies occurred:

import ollama

def explain_anomaly(anomaly_data, context_data):
    prompt = f"""Analyze this anomalous data point and explain potential causes.
    
    Anomaly details:
    {anomaly_data.to_string()}
    
    Context (24h surrounding data):
    {context_data.describe()}
    
    Consider: external events, seasonal patterns, data quality issues.
    Provide 3 most likely explanations ranked by probability."""
    
    response = ollama.chat(model='llama3.2', messages=[
        {'role': 'user', 'content': prompt}
    ])
    return response['message']['content']

# Explain detected anomalies
for idx in df[df['is_anomaly']].index[:3]:
    anomaly_row = df.loc[idx]
    context = df.loc[idx-pd.Timedelta(hours=12):idx+pd.Timedelta(hours=12)]
    explanation = explain_anomaly(anomaly_row, context)
    print(f"Anomaly at {idx}:")
    print(explanation)

Rolling Anomaly Detection

def rolling_zscore(series, window=30, threshold=3):
    rolling_mean = series.rolling(window, min_periods=window//2).mean()
    rolling_std = series.rolling(window, min_periods=window//2).std()
    z_scores = (series - rolling_mean) / rolling_std
    return np.abs(z_scores) > threshold

df['rolling_anomaly'] = rolling_zscore(df['metric'])

Visualizing Anomalies

fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(df.index, df['metric'], label='Normal', alpha=0.7)
anomalies = df[df['is_anomaly'] | df['rolling_anomaly']]
ax.scatter(anomalies.index, anomalies['metric'], c='red', s=50, label='Anomaly', zorder=5)
ax.axhline(df['metric'].mean() + 3*df['metric'].std(), color='orange', linestyle='--', label='Threshold')
ax.legend()
plt.title('Anomaly Detection Results')
EXERCISE

Implement rolling Z-score anomaly detection with window=48h and threshold=2.5. Explain top 3 anomalies using Ollama and save results to CSV.