Skip to content

Commit

Permalink
chore(anomaly-detection): add log transformation and z-score based sc…
Browse files Browse the repository at this point in the history
…orer
  • Loading branch information
ram-senth committed Feb 4, 2025
1 parent 53c7cbb commit f5aa68e
Show file tree
Hide file tree
Showing 9 changed files with 612 additions and 110 deletions.
8 changes: 6 additions & 2 deletions src/seer/anomaly_detection/detectors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from seer.anomaly_detection.detectors import (
anomaly_detectors,
mp_boxcox_scorer,
mp_cascading_scorer,
mp_scorers,
mp_utils,
normalizers,
Expand All @@ -15,8 +17,10 @@
SuSSWindowSizeSelector = window_size_selectors.SuSSWindowSizeSelector
FlagsAndScores = mp_scorers.FlagsAndScores
MPScorer = mp_scorers.MPScorer
MPCascadingScorer = mp_scorers.MPCascadingScorer

MPCascadingScorer = mp_cascading_scorer.MPCascadingScorer
LowVarianceScorer = mp_scorers.LowVarianceScorer
MPBoxCoxScorer = mp_boxcox_scorer.MPBoxCoxScorer
MPIQRScorer = mp_scorers.MPIQRScorer
Normalizer = normalizers.Normalizer
MinMaxNormalizer = normalizers.MinMaxNormalizer

Expand Down
242 changes: 242 additions & 0 deletions src/seer/anomaly_detection/detectors/mp_boxcox_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import datetime
from typing import Dict, List, Tuple

import numpy as np
import numpy.typing as npt
import sentry_sdk
from pydantic import Field

from seer.anomaly_detection.detectors.location_detectors import LocationDetector
from seer.anomaly_detection.detectors.mp_scorers import FlagsAndScores, MPScorer
from seer.anomaly_detection.models import (
AlgoConfig,
AnomalyDetectionConfig,
AnomalyFlags,
Directions,
PointLocation,
Sensitivities,
Threshold,
ThresholdType,
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError, ServerError


class MPBoxCoxScorer(MPScorer):
"""
This class implements a scoring method for detecting anomalies in time series data using the Box-Cox transformation.
The Box-Cox transformation is applied to normalize the data, followed by z-score based anomaly detection.
"""

box_cox_lambda: float = Field(
0.0,
description="The lambda parameter for the Box-Cox transformation. Default 0 corresponds to log transform.",
)
z_score_thresholds: Dict[Sensitivities, float] = Field(
{
"high": 2.0, # 95.4% confidence interval
"medium": 2.5, # 98.8% confidence interval
"low": 3.0, # 99.7% confidence interval
},
description="Z-score thresholds for different sensitivity levels",
)

def _inverse_box_cox_transform(self, x: float, bc_lambda: float) -> float:
"""Apply inverse Box-Cox transformation to return data to original scale.
Args:
x: The Box-Cox transformed value
bc_lambda: The lambda parameter for the Box-Cox transformation
Returns:
float: The inverse transformed value in the original scale
"""
if bc_lambda == 0:
return np.exp([x])[0] - 1
return np.power(bc_lambda * x + 1, 1 / bc_lambda) - 1

# def _inverse_box_cox_transform(self, x: float, bc_lambda: float) -> float:
# """Apply inverse Box-Cox transformation to return data to original scale.

# Parameters:
# x: The Box-Cox transformed data
# bc_lambda: The lambda parameter for the Box-Cox transformation

# Returns:
# The inverse transformed data in the original scale
# """
# if bc_lambda <= 0:
# return np.exp([x])[0] - 1
# return special.inv_boxcox([x], bc_lambda)[0] - 1

def _box_cox_transform(self, x: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""Apply Box-Cox transformation to the data."""
# Ensure data is positive for Box-Cox transform
min_val = x.min()
if min_val <= 0:
x = x - min_val + 1

if self.box_cox_lambda == 0:
return np.log(x)
return (np.power(x, self.box_cox_lambda) - 1) / self.box_cox_lambda

def _get_z_scores(
self, values: npt.NDArray[np.float64], sensitivity: Sensitivities
) -> Tuple[npt.NDArray[np.float64], float, float, float]:
"""Calculate z-scores and threshold."""
if sensitivity not in self.z_score_thresholds:
raise ClientError(f"Invalid sensitivity: {sensitivity}")

transformed = self._box_cox_transform(values)
mean = np.mean(transformed)
std = float(np.std(transformed))
z_scores = (transformed - mean) / std if std > 0 else np.zeros_like(transformed)
threshold = self.z_score_thresholds[sensitivity]
threshold_transformed = self._inverse_box_cox_transform(threshold, self.box_cox_lambda)

return z_scores, threshold, std, threshold_transformed

@inject
def batch_score(
self,
values: npt.NDArray[np.float64],
timestamps: npt.NDArray[np.float64],
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> FlagsAndScores:
z_scores, threshold, std, threshold_transformed = self._get_z_scores(
values, ad_config.sensitivity
)

scores = []
flags = []
thresholds = []
time_allocated = datetime.timedelta(milliseconds=time_budget_ms) if time_budget_ms else None
time_start = datetime.datetime.now()

idx_to_detect_location_from = (
len(values) - algo_config.direction_detection_num_timesteps_in_batch_mode
)
batch_size = 10 if len(values) > 10 else 1
for i, z_score in enumerate(z_scores):
if time_allocated is not None and i % batch_size == 0:
time_elapsed = datetime.datetime.now() - time_start
if time_allocated is not None and time_elapsed > time_allocated:
sentry_sdk.set_extra("time_taken_for_batch_detection", time_elapsed)
sentry_sdk.set_extra("time_allocated_for_batch_detection", time_allocated)
sentry_sdk.capture_message(
"batch_detection_took_too_long",
level="error",
)
raise ServerError("Batch detection took too long")
score = z_score
flag: AnomalyFlags = "none"
location_thresholds: List[Threshold] = []
if std != 0 and score > threshold:
flag = "anomaly_higher_confidence"
if i >= idx_to_detect_location_from:
flag, location_thresholds = self._adjust_flag_for_direction(
flag,
ad_config.direction,
values[i],
timestamps[i],
values[:i],
timestamps[:i],
location_detector,
)
cur_thresholds = [
Threshold(
type=ThresholdType.BOX_COX_THRESHOLD,
upper=threshold_transformed,
lower=-threshold_transformed,
)
]

scores.append(score)
flags.append(flag)
cur_thresholds.extend(location_thresholds)
thresholds.append(cur_thresholds)

return FlagsAndScores(flags=flags, scores=scores, thresholds=thresholds)

@inject
def stream_score(
self,
streamed_value: np.float64,
streamed_timestamp: np.float64,
streamed_mp_dist: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
history_mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> FlagsAndScores:
# Include current value in z-score calculation
values = np.append(history_values, streamed_value)
z_scores, threshold, std, threshold_transformed = self._get_z_scores(
values, ad_config.sensitivity
)

# Get z-score for streamed value
score = z_scores[-1]
thresholds: List[Threshold] = []
flag = "none"

if std != 0 and score > threshold:
flag, thresholds = self._adjust_flag_for_direction(
"anomaly_higher_confidence",
ad_config.direction,
streamed_value,
streamed_timestamp,
history_values,
history_timestamps,
location_detector,
)

thresholds.append(
Threshold(
type=ThresholdType.BOX_COX_THRESHOLD,
upper=threshold_transformed,
lower=threshold_transformed,
)
)

return FlagsAndScores(
flags=[flag],
scores=[score],
thresholds=[thresholds],
)

def _adjust_flag_for_direction(
self,
flag: AnomalyFlags,
direction: Directions,
streamed_value: np.float64,
streamed_timestamp: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
location_detector: LocationDetector,
) -> Tuple[AnomalyFlags, List[Threshold]]:
if flag == "none" or direction == "both":
return flag, []

if len(history_values) == 0:
raise ValueError("No history values to detect location")

relative_location = location_detector.detect(
streamed_value, streamed_timestamp, history_values, history_timestamps
)
if relative_location is None:
return flag, []

if (direction == "up" and relative_location.location != PointLocation.UP) or (
direction == "down" and relative_location.location != PointLocation.DOWN
):
return "none", relative_location.thresholds
return flag, relative_location.thresholds
83 changes: 83 additions & 0 deletions src/seer/anomaly_detection/detectors/mp_cascading_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Optional

import numpy as np
import numpy.typing as npt
from pydantic import Field

from seer.anomaly_detection.detectors.location_detectors import LocationDetector
from seer.anomaly_detection.detectors.mp_boxcox_scorer import MPBoxCoxScorer
from seer.anomaly_detection.detectors.mp_scorers import FlagsAndScores, LowVarianceScorer, MPScorer
from seer.anomaly_detection.models import AlgoConfig, AnomalyDetectionConfig
from seer.dependency_injection import inject, injected


class MPCascadingScorer(MPScorer):
"""
This class implements a cascading scoring mechanism for Matrix Profile-based anomaly detection.
It applies multiple scorers in sequence, returning the result of the first scorer that produces a valid output.
This approach allows for fallback strategies and potentially more robust anomaly detection.
The default implementation uses the LowVarianceScorer and the MPIQRScorer.
"""

scorers: list[MPScorer] = Field(
[LowVarianceScorer(), MPBoxCoxScorer()], description="The list of scorers to cascade"
)

@inject
def batch_score(
self,
values: npt.NDArray[np.float64],
timestamps: npt.NDArray[np.float64],
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> Optional[FlagsAndScores]:
for scorer in self.scorers:
flags_and_scores = scorer.batch_score(
values,
timestamps,
mp_dist,
ad_config,
window_size,
time_budget_ms,
algo_config,
location_detector,
)
if flags_and_scores is not None:
return flags_and_scores
return None

@inject
def stream_score(
self,
streamed_value: np.float64,
streamed_timestamp: np.float64,
streamed_mp_dist: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
history_mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> Optional[FlagsAndScores]:
for scorer in self.scorers:
flags_and_scores = scorer.stream_score(
streamed_value,
streamed_timestamp,
streamed_mp_dist,
history_values,
history_timestamps,
history_mp_dist,
ad_config,
window_size,
algo_config,
location_detector,
)
if flags_and_scores is not None:
return flags_and_scores
return None
Loading

0 comments on commit f5aa68e

Please sign in to comment.