13. Anomaly Detection
Chapter 13 of 18 · 20 min
Anomaly detection flags unusual observations that deviate from expected patterns. Local models enable private, customized anomaly detection without data leaving your infrastructure.
Z-Score Detection
Simple but effective for normally distributed data:
def zscore_anomaly(series, threshold=3):
mean = series.mean()
std = series.std()
z_scores = (series - mean) / std
return pd.Series(np.abs(z_scores) > threshold, index=series.index)
df['anomaly_zscore'] = zscore_anomaly(df['metric'], threshold=3)
anomalies_zscore = df[df['anomaly_zscore']]
IQR-Based Detection
reliable against outliers:
def iqr_anomaly(series, multiplier=1.5):
q1 = series.quantile(0.25)
q3 = series.quantile(0.75)
iqr = q3 - q1
lower = q1 - multiplier * iqr
upper = q3 + multiplier * iqr
return (series < lower) | (series > upper)
df['anomaly_iqr'] = iqr_anomaly(df['metric'])
Isolation Forest with Local Model
Isolation Forest excels at multivariate anomalies:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# Prepare features
features = df[['metric1', 'metric2', 'metric3']].dropna()
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# Train isolation forest
iso_forest = IsolationForest(contamination=0.01, random_state=42, n_estimators=100)
df['anomaly_iforest'] = iso_forest.fit_predict(scaled_features)
# -1 = anomaly, 1 = normal
df['is_anomaly'] = df['anomaly_iforest'] == -1
Using Ollama for Contextual Anomaly Analysis
Use local LLMs to explain why anomalies occurred:
import ollama
def explain_anomaly(anomaly_data, context_data):
prompt = f"""Analyze this anomalous data point and explain potential causes.
Anomaly details:
{anomaly_data.to_string()}
Context (24h surrounding data):
{context_data.describe()}
Consider: external events, seasonal patterns, data quality issues.
Provide 3 most likely explanations ranked by probability."""
response = ollama.chat(model='llama3.2', messages=[
{'role': 'user', 'content': prompt}
])
return response['message']['content']
# Explain detected anomalies
for idx in df[df['is_anomaly']].index[:3]:
anomaly_row = df.loc[idx]
context = df.loc[idx-pd.Timedelta(hours=12):idx+pd.Timedelta(hours=12)]
explanation = explain_anomaly(anomaly_row, context)
print(f"Anomaly at {idx}:")
print(explanation)
Rolling Anomaly Detection
def rolling_zscore(series, window=30, threshold=3):
rolling_mean = series.rolling(window, min_periods=window//2).mean()
rolling_std = series.rolling(window, min_periods=window//2).std()
z_scores = (series - rolling_mean) / rolling_std
return np.abs(z_scores) > threshold
df['rolling_anomaly'] = rolling_zscore(df['metric'])
Visualizing Anomalies
fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(df.index, df['metric'], label='Normal', alpha=0.7)
anomalies = df[df['is_anomaly'] | df['rolling_anomaly']]
ax.scatter(anomalies.index, anomalies['metric'], c='red', s=50, label='Anomaly', zorder=5)
ax.axhline(df['metric'].mean() + 3*df['metric'].std(), color='orange', linestyle='--', label='Threshold')
ax.legend()
plt.title('Anomaly Detection Results')
EXERCISE
Implement rolling Z-score anomaly detection with window=48h and threshold=2.5. Explain top 3 anomalies using Ollama and save results to CSV.