Skip to main content

AI-Powered Incident Detection

Traditional threshold-based alerting generates noise. AI-powered incident detection uses machine learning to identify real incidents before users notice.

How It Works

┌──────────────────────────────────────────────────────────┐
│ AI Incident Detection Pipeline │
│ │
│ ┌─────────────┐ │
│ │ Metrics │──┐ │
│ │ Logs │ │ ┌──────────────┐ │
│ │ Traces │──┼─▶│ Feature │ │
│ │ Events │ │ │ Engineering │ │
│ │ Deployments │──┘ └──────┬───────┘ │
│ └─────────────┘ │ │
│ ┌──────▼───────┐ │
│ │ ML Models │ │
│ │ • Anomaly Det │ │
│ │ • Forecasting │ │
│ │ • Correlation │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ Incident │ │
│ │ Classifier │ │
│ │ • Severity │ │
│ │ • RCA │ │
│ │ • Remediation │ │
│ └──────┬───────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ Action Engine │ │
│ │ Alert │ Runbook │ Fix│ │
│ └────────────────────┘ │
└──────────────────────────────────────────────────────────┘

Anomaly Detection Methods

1. Statistical Methods (Good Starting Point)

import numpy as np
from scipy import stats

def detect_anomalies_zscore(
data: list[float],
threshold: float = 3.0
) -> list[dict]:
"""Detect anomalies using z-score."""
mean = np.mean(data)
std = np.std(data)
anomalies = []

for i, value in enumerate(data):
if std > 0:
z = abs((value - mean) / std)
if z > threshold:
anomalies.append({
"index": i,
"value": value,
"z_score": z,
"severity": "critical" if z > 5 else "warning"
})

return anomalies

2. Isolation Forest (Unsupervised ML)

from sklearn.ensemble import IsolationForest
import numpy as np

def train_anomaly_detector(
training_data: np.ndarray,
contamination: float = 0.05
) -> IsolationForest:
"""Train an Isolation Forest anomaly detector."""
model = IsolationForest(
contamination=contamination,
n_estimators=200,
random_state=42
)
model.fit(training_data)
return model

def predict_anomalies(
model: IsolationForest,
new_data: np.ndarray
) -> np.ndarray:
"""Predict anomalies: -1 = anomaly, 1 = normal."""
predictions = model.predict(new_data)
scores = model.decision_function(new_data)
return predictions, scores

# Example: Multi-metric anomaly detection
# Features: [cpu_usage, memory_usage, request_rate, error_rate, latency_p99]
training_data = np.array([
[45, 60, 1000, 0.01, 0.12],
[50, 62, 1100, 0.02, 0.11],
[48, 58, 950, 0.01, 0.13],
# ... 30 days of normal data
])

model = train_anomaly_detector(training_data)

# New data point
current = np.array([[92, 85, 2500, 0.15, 1.2]])
pred, score = predict_anomalies(model, current)
# pred = [-1] → Anomaly detected!

3. Prophet for Time Series Forecasting

from prophet import Prophet
import pandas as pd

def forecast_metric(
metric_data: pd.DataFrame,
periods: int = 60 # 60 data points ahead
) -> pd.DataFrame:
"""Forecast metric values and detect deviations."""
model = Prophet(
interval_width=0.99,
changepoint_prior_scale=0.05
)
model.fit(metric_data) # Must have 'ds' and 'y' columns

future = model.make_future_dataframe(periods=periods, freq='5min')
forecast = model.predict(future)

return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

Event Correlation

Correlate related alerts to reduce noise and identify root causes:

from datetime import datetime, timedelta
from collections import defaultdict

def correlate_events(
events: list[dict],
time_window: timedelta = timedelta(minutes=5),
service_graph: dict = None
) -> list[dict]:
"""Group related events into incidents."""
incidents = []
used = set()

# Sort by timestamp
events.sort(key=lambda e: e["timestamp"])

for i, event in enumerate(events):
if i in used:
continue

# Find correlated events
group = [event]
used.add(i)

for j, other in enumerate(events[i+1:], i+1):
if j in used:
continue

time_diff = other["timestamp"] - event["timestamp"]
if time_diff > time_window:
break

# Correlation rules
if (
same_service(event, other) or
dependent_services(event, other, service_graph) or
same_infrastructure(event, other)
):
group.append(other)
used.add(j)

if len(group) > 1:
incidents.append({
"events": group,
"severity": max(e["severity"] for e in group),
"root_cause": identify_root_cause(group, service_graph),
"started_at": group[0]["timestamp"],
})

return incidents

Automated Remediation

# remediation-playbook.yml
playbooks:
high_cpu:
trigger:
metric: cpu_usage_percent
condition: "> 90 for 5m"
actions:
- name: Scale up deployment
type: kubernetes
action: scale
params:
replicas: "+2"
- name: Notify on-call
type: slack
channel: "#incidents"

disk_pressure:
trigger:
metric: disk_usage_percent
condition: "> 85"
actions:
- name: Clean old logs
type: command
command: "find /var/log -name '*.gz' -mtime +7 -delete"
- name: Alert if still high
type: pagerduty
severity: warning

pod_crash_loop:
trigger:
event: CrashLoopBackOff
count: "> 3 in 10m"
actions:
- name: Capture diagnostics
type: command
command: "kubectl logs {pod} --previous > /tmp/crash-{pod}.log"
- name: Rollback deployment
type: kubernetes
action: rollback
- name: Create incident ticket
type: jira
project: OPS

Implementation Roadmap

PhaseTimelineFocus
Phase 1Week 1-2Statistical anomaly detection on key metrics
Phase 2Week 3-4Event correlation and alert grouping
Phase 3Month 2ML-based detection (Isolation Forest)
Phase 4Month 3Automated remediation for common issues
Phase 5Month 4+Predictive alerting with time series forecasting

Next Steps