Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AnomalyDetection] Add transforms and detectors. #34218

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions sdks/python/apache_beam/ml/anomaly/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from typing import Iterable
from typing import Optional

from apache_beam.ml.anomaly.base import DEFAULT_MISSING_LABEL
from apache_beam.ml.anomaly.base import DEFAULT_NORMAL_LABEL
from apache_beam.ml.anomaly.base import DEFAULT_OUTLIER_LABEL
from apache_beam.ml.anomaly.base import AggregationFn
from apache_beam.ml.anomaly.base import AnomalyPrediction
from apache_beam.ml.anomaly.specifiable import specifiable
Expand Down Expand Up @@ -69,9 +72,13 @@ def __init__(
agg_func: Callable[[Iterable[int]], int],
agg_model_id: Optional[str] = None,
include_source_predictions: bool = False,
missing_label: int = -2,
normal_label: int = DEFAULT_NORMAL_LABEL,
outlier_label: int = DEFAULT_OUTLIER_LABEL,
missing_label: int = DEFAULT_MISSING_LABEL,
):
self._agg = agg_func
self._normal_label = normal_label
self._outlier_label = outlier_label
self._missing_label = missing_label
_AggModelIdMixin.__init__(self, agg_model_id)
_SourcePredictionMixin.__init__(self, include_source_predictions)
Expand Down Expand Up @@ -208,10 +215,8 @@ class MajorityVote(LabelAggregation):
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, tie_breaker=0, **kwargs):
def __init__(self, tie_breaker=DEFAULT_NORMAL_LABEL, **kwargs):
self._tie_breaker = tie_breaker
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
counters = collections.Counter(predictions)
Expand Down Expand Up @@ -248,10 +253,7 @@ class AllVote(LabelAggregation):
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def __init__(self, **kwargs):
def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if all(
map(lambda p: p == self._outlier_label,
Expand Down Expand Up @@ -282,10 +284,7 @@ class AnyVote(LabelAggregation):
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def __init__(self, **kwargs):
def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if any(
map(lambda p: p == self._outlier_label,
Expand Down
12 changes: 8 additions & 4 deletions sdks/python/apache_beam/ml/anomaly/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
"EnsembleAnomalyDetector"
]

DEFAULT_NORMAL_LABEL = 0
DEFAULT_OUTLIER_LABEL = 1
DEFAULT_MISSING_LABEL = -2


@dataclass(frozen=True)
class AnomalyPrediction():
Expand Down Expand Up @@ -79,9 +83,9 @@ class ThresholdFn(abc.ABC):
"""
def __init__(
self,
normal_label: int = 0,
outlier_label: int = 1,
missing_label: int = -2):
normal_label: int = DEFAULT_NORMAL_LABEL,
outlier_label: int = DEFAULT_OUTLIER_LABEL,
missing_label: int = DEFAULT_MISSING_LABEL):
self._normal_label = normal_label
self._outlier_label = outlier_label
self._missing_label = missing_label
Expand Down Expand Up @@ -165,7 +169,7 @@ def learn_one(self, x: beam.Row) -> None:
raise NotImplementedError

@abc.abstractmethod
def score_one(self, x: beam.Row) -> float:
def score_one(self, x: beam.Row) -> Optional[float]:
"""Scores a single data instance for anomalies.

Args:
Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/detectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from apache_beam.ml.anomaly.detectors.zscore import ZScore
from apache_beam.ml.anomaly.detectors.robust_zscore import RobustZScore
from apache_beam.ml.anomaly.detectors.iqr import IQR
132 changes: 132 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/detectors/iqr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import math
from typing import Optional

import apache_beam as beam
from apache_beam.ml.anomaly.base import AnomalyDetector
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.anomaly.thresholds import FixedThreshold
from apache_beam.ml.anomaly.univariate.base import EPSILON
from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long
from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker
from apache_beam.ml.anomaly.univariate.quantile import SecondaryBufferedQuantileTracker # pylint: disable=line-too-long

DEFAULT_WINDOW_SIZE = 1000


@specifiable
class IQR(AnomalyDetector):
"""Interquartile Range (IQR) anomaly detector.

This class implements an anomaly detection algorithm based on the
Interquartile Range (IQR) [#]_ . It calculates the IQR using quantile trackers
for Q1 (25th percentile) and Q3 (75th percentile) and scores data points based
on their deviation from these quartiles.

The score is calculated as follows:

* If a data point is above Q3, the score is (value - Q3) / IQR.
* If a data point is below Q1, the score is (Q1 - value) / IQR.
* If a data point is within the IQR (Q1 <= value <= Q3), the score is 0.
Initializes the IQR anomaly detector.

Args:
q1_tracker: Optional QuantileTracker for Q1 (25th percentile). If None, a
BufferedSlidingQuantileTracker with a default window size is used.
q3_tracker: Optional QuantileTracker for Q3 (75th percentile). If None, a
SecondaryBufferedQuantileTracker based on q1_tracker is used.
threshold_criterion: Optional ThresholdFn to apply on the score. Defaults
to `FixedThreshold(1.5)` since outliers are commonly defined as data
points that fall below Q1 - 1.5 IQR or above Q3 + 1.5 IQR.
**kwargs: Additional keyword arguments.

.. [#] https://en.wikipedia.org/wiki/Interquartile_range
"""
def __init__(
self,
q1_tracker: Optional[QuantileTracker] = None,
q3_tracker: Optional[QuantileTracker] = None,
**kwargs):
if "threshold_criterion" not in kwargs:
kwargs["threshold_criterion"] = FixedThreshold(1.5)
super().__init__(**kwargs)

self._q1_tracker = q1_tracker or \
BufferedSlidingQuantileTracker(DEFAULT_WINDOW_SIZE, 0.25)
assert self._q1_tracker._q == 0.25, \
"q1_tracker must be initialized with q = 0.25"

self._q3_tracker = q3_tracker or \
SecondaryBufferedQuantileTracker(self._q1_tracker, 0.75)
assert self._q3_tracker._q == 0.75, \
"q3_tracker must be initialized with q = 0.75"

def learn_one(self, x: beam.Row) -> None:
"""Updates the quantile trackers with a new data point.

Args:
x: A `beam.Row` containing a single numerical value.
"""
if len(x.__dict__) != 1:
logging.warning(
"IQR.learn_one expected univariate input, but got %s", str(x))
return

v = next(iter(x))
self._q1_tracker.push(v)
self._q3_tracker.push(v)

def score_one(self, x: beam.Row) -> Optional[float]:
"""Scores a data point based on its deviation from the IQR.

Args:
x: A `beam.Row` containing a single numerical value.

Returns:
float | None: The anomaly score.
"""
if len(x.__dict__) != 1:
logging.warning(
"IQR.score_one expected univariate input, but got %s", str(x))
return None

v = next(iter(x))
if v is None or math.isnan(v):
return None

q1 = self._q1_tracker.get()
q3 = self._q3_tracker.get()

# not enough data points to compute median or median absolute deviation
if math.isnan(q1) or math.isnan(q3):
return float('NaN')

iqr = q3 - q1
if abs(iqr) < EPSILON:
return 0.0

if v > q3:
return (v - q3) / iqr

if v < q1:
return (q1 - v) / iqr

# q1 <= v <= q3, normal points
return 0
52 changes: 52 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/detectors/iqr_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import math
import unittest

import apache_beam as beam
from apache_beam.ml.anomaly.detectors.iqr import IQR


class IQRTest(unittest.TestCase):
input = [
beam.Row(x=1),
beam.Row(x=1),
beam.Row(x=5),
beam.Row(x=9),
beam.Row(x=20),
beam.Row(x=10),
beam.Row(x=1)
]

def test_with_default_trackers(self):
iqr = IQR()

scores = []
for row in IQRTest.input:
scores.append(iqr.score_one(row))
iqr.learn_one(row)

self.assertTrue(math.isnan(scores[0]))
self.assertEqual(
scores[1:], [0.0, 0.0, 3.0, 2.8, 0.125, 0.12903225806451613])


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading
Loading