diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations.py b/sdks/python/apache_beam/ml/anomaly/aggregations.py index 832f28316502..6d9f3797663b 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations.py @@ -23,6 +23,9 @@ from typing import Iterable from typing import Optional +from apache_beam.ml.anomaly.base import DEFAULT_MISSING_LABEL +from apache_beam.ml.anomaly.base import DEFAULT_NORMAL_LABEL +from apache_beam.ml.anomaly.base import DEFAULT_OUTLIER_LABEL from apache_beam.ml.anomaly.base import AggregationFn from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly.specifiable import specifiable @@ -69,9 +72,13 @@ def __init__( agg_func: Callable[[Iterable[int]], int], agg_model_id: Optional[str] = None, include_source_predictions: bool = False, - missing_label: int = -2, + normal_label: int = DEFAULT_NORMAL_LABEL, + outlier_label: int = DEFAULT_OUTLIER_LABEL, + missing_label: int = DEFAULT_MISSING_LABEL, ): self._agg = agg_func + self._normal_label = normal_label + self._outlier_label = outlier_label self._missing_label = missing_label _AggModelIdMixin.__init__(self, agg_model_id) _SourcePredictionMixin.__init__(self, include_source_predictions) @@ -208,10 +215,8 @@ class MajorityVote(LabelAggregation): **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ - def __init__(self, normal_label=0, outlier_label=1, tie_breaker=0, **kwargs): + def __init__(self, tie_breaker=DEFAULT_NORMAL_LABEL, **kwargs): self._tie_breaker = tie_breaker - self._normal_label = normal_label - self._outlier_label = outlier_label def inner(predictions: Iterable[int]) -> int: counters = collections.Counter(predictions) @@ -248,10 +253,7 @@ class AllVote(LabelAggregation): **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ - def __init__(self, normal_label=0, outlier_label=1, **kwargs): - self._normal_label = normal_label - self._outlier_label = outlier_label - + def __init__(self, **kwargs): def inner(predictions: Iterable[int]) -> int: return self._outlier_label if all( map(lambda p: p == self._outlier_label, @@ -282,10 +284,7 @@ class AnyVote(LabelAggregation): **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ - def __init__(self, normal_label=0, outlier_label=1, **kwargs): - self._normal_label = normal_label - self._outlier_label = outlier_label - + def __init__(self, **kwargs): def inner(predictions: Iterable[int]) -> int: return self._outlier_label if any( map(lambda p: p == self._outlier_label, diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index 4242decf97ef..567af45187a8 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -37,6 +37,10 @@ "EnsembleAnomalyDetector" ] +DEFAULT_NORMAL_LABEL = 0 +DEFAULT_OUTLIER_LABEL = 1 +DEFAULT_MISSING_LABEL = -2 + @dataclass(frozen=True) class AnomalyPrediction(): @@ -79,9 +83,9 @@ class ThresholdFn(abc.ABC): """ def __init__( self, - normal_label: int = 0, - outlier_label: int = 1, - missing_label: int = -2): + normal_label: int = DEFAULT_NORMAL_LABEL, + outlier_label: int = DEFAULT_OUTLIER_LABEL, + missing_label: int = DEFAULT_MISSING_LABEL): self._normal_label = normal_label self._outlier_label = outlier_label self._missing_label = missing_label @@ -165,7 +169,7 @@ def learn_one(self, x: beam.Row) -> None: raise NotImplementedError @abc.abstractmethod - def score_one(self, x: beam.Row) -> float: + def score_one(self, x: beam.Row) -> Optional[float]: """Scores a single data instance for anomalies. Args: diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/__init__.py b/sdks/python/apache_beam/ml/anomaly/detectors/__init__.py new file mode 100644 index 000000000000..f3268755cf99 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from apache_beam.ml.anomaly.detectors.zscore import ZScore +from apache_beam.ml.anomaly.detectors.robust_zscore import RobustZScore +from apache_beam.ml.anomaly.detectors.iqr import IQR diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/iqr.py b/sdks/python/apache_beam/ml/anomaly/detectors/iqr.py new file mode 100644 index 000000000000..c713bbd444ce --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/iqr.py @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +from typing import Optional + +import apache_beam as beam +from apache_beam.ml.anomaly.base import AnomalyDetector +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.thresholds import FixedThreshold +from apache_beam.ml.anomaly.univariate.base import EPSILON +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import SecondaryBufferedQuantileTracker # pylint: disable=line-too-long + +DEFAULT_WINDOW_SIZE = 1000 + + +@specifiable +class IQR(AnomalyDetector): + """Interquartile Range (IQR) anomaly detector. + + This class implements an anomaly detection algorithm based on the + Interquartile Range (IQR) [#]_ . It calculates the IQR using quantile trackers + for Q1 (25th percentile) and Q3 (75th percentile) and scores data points based + on their deviation from these quartiles. + + The score is calculated as follows: + + * If a data point is above Q3, the score is (value - Q3) / IQR. + * If a data point is below Q1, the score is (Q1 - value) / IQR. + * If a data point is within the IQR (Q1 <= value <= Q3), the score is 0. + Initializes the IQR anomaly detector. + + Args: + q1_tracker: Optional QuantileTracker for Q1 (25th percentile). If None, a + BufferedSlidingQuantileTracker with a default window size is used. + q3_tracker: Optional QuantileTracker for Q3 (75th percentile). If None, a + SecondaryBufferedQuantileTracker based on q1_tracker is used. + threshold_criterion: Optional ThresholdFn to apply on the score. Defaults + to `FixedThreshold(1.5)` since outliers are commonly defined as data + points that fall below Q1 - 1.5 IQR or above Q3 + 1.5 IQR. + **kwargs: Additional keyword arguments. + + .. [#] https://en.wikipedia.org/wiki/Interquartile_range + """ + def __init__( + self, + q1_tracker: Optional[QuantileTracker] = None, + q3_tracker: Optional[QuantileTracker] = None, + **kwargs): + if "threshold_criterion" not in kwargs: + kwargs["threshold_criterion"] = FixedThreshold(1.5) + super().__init__(**kwargs) + + self._q1_tracker = q1_tracker or \ + BufferedSlidingQuantileTracker(DEFAULT_WINDOW_SIZE, 0.25) + assert self._q1_tracker._q == 0.25, \ + "q1_tracker must be initialized with q = 0.25" + + self._q3_tracker = q3_tracker or \ + SecondaryBufferedQuantileTracker(self._q1_tracker, 0.75) + assert self._q3_tracker._q == 0.75, \ + "q3_tracker must be initialized with q = 0.75" + + def learn_one(self, x: beam.Row) -> None: + """Updates the quantile trackers with a new data point. + + Args: + x: A `beam.Row` containing a single numerical value. + """ + if len(x.__dict__) != 1: + logging.warning( + "IQR.learn_one expected univariate input, but got %s", str(x)) + return + + v = next(iter(x)) + self._q1_tracker.push(v) + self._q3_tracker.push(v) + + def score_one(self, x: beam.Row) -> Optional[float]: + """Scores a data point based on its deviation from the IQR. + + Args: + x: A `beam.Row` containing a single numerical value. + + Returns: + float | None: The anomaly score. + """ + if len(x.__dict__) != 1: + logging.warning( + "IQR.score_one expected univariate input, but got %s", str(x)) + return None + + v = next(iter(x)) + if v is None or math.isnan(v): + return None + + q1 = self._q1_tracker.get() + q3 = self._q3_tracker.get() + + # not enough data points to compute median or median absolute deviation + if math.isnan(q1) or math.isnan(q3): + return float('NaN') + + iqr = q3 - q1 + if abs(iqr) < EPSILON: + return 0.0 + + if v > q3: + return (v - q3) / iqr + + if v < q1: + return (q1 - v) / iqr + + # q1 <= v <= q3, normal points + return 0 diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/iqr_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/iqr_test.py new file mode 100644 index 000000000000..c2d175fdd02f --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/iqr_test.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest + +import apache_beam as beam +from apache_beam.ml.anomaly.detectors.iqr import IQR + + +class IQRTest(unittest.TestCase): + input = [ + beam.Row(x=1), + beam.Row(x=1), + beam.Row(x=5), + beam.Row(x=9), + beam.Row(x=20), + beam.Row(x=10), + beam.Row(x=1) + ] + + def test_with_default_trackers(self): + iqr = IQR() + + scores = [] + for row in IQRTest.input: + scores.append(iqr.score_one(row)) + iqr.learn_one(row) + + self.assertTrue(math.isnan(scores[0])) + self.assertEqual( + scores[1:], [0.0, 0.0, 3.0, 2.8, 0.125, 0.12903225806451613]) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore.py b/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore.py new file mode 100644 index 000000000000..488b165efe47 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore.py @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +from typing import Optional + +import apache_beam as beam +from apache_beam.ml.anomaly.base import AnomalyDetector +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.thresholds import FixedThreshold +from apache_beam.ml.anomaly.univariate.base import EPSILON +from apache_beam.ml.anomaly.univariate.mad import MadTracker + + +# pylint: disable=line-too-long +@specifiable +class RobustZScore(AnomalyDetector): + """Robust Z-Score anomaly detector. + + This class implements an detection algorithm based on Robust Z-Score (also + known as Modified Z-Score), which is a robust alternative to the traditional + Z-score [#]_. It uses the median and Median Absolute Deviation (MAD) to + compute a score that is less sensitive to outliers. + + The score is calculated as: `|0.6745 * (value - median) / MAD|` + + Important: + In the streaming setting, we use the online version of median and MAD in the + calculation. Therefore, the score computed here does not exactly match its + batch counterpart. + + This implementation is adapted from the implementation within PySAD [#]_: + https://github.com/selimfirat/pysad/blob/master/pysad/models/median_absolute_deviation.py + + The batch version can be seen at PyOD [#]_: + https://github.com/yzhao062/pyod/blob/master/pyod/models/mad.py + + + Args: + mad_tracker: Optional `MadTracker` instance. If None, a default `MadTracker` + is created. + threshold_criterion: threshold_criterion: Optional `ThresholdFn` to apply on + the score. Defaults to `FixedThreshold(3)` due to the commonly used + 3-sigma rule. + **kwargs: Additional keyword arguments. + + .. [#] Hoaglin, David C.. (2013). Volume 16: How to Detect and Handle Outliers. + .. [#] Yilmaz, Selim & Kozat, Suleyman. (2020). PySAD: A Streaming Anomaly Detection Framework in Python. 10.48550/arXiv.2009.02572. + .. [#] Zhao, Y., Nasrullah, Z. and Li, Z.. (2019). PyOD: A Python Toolbox for Scalable Outlier Detection. Journal of machine learning research (JMLR), 20(96), pp.1-7. + """ + # pylint: enable=line-too-long + SCALE_FACTOR = 0.6745 + + def __init__(self, mad_tracker: Optional[MadTracker] = None, **kwargs): + if "threshold_criterion" not in kwargs: + kwargs["threshold_criterion"] = FixedThreshold(3) + super().__init__(**kwargs) + self._mad_tracker = mad_tracker or MadTracker() + + def learn_one(self, x: beam.Row) -> None: + """Updates the `MadTracker` with a new data point. + + Args: + x: A `beam.Row` containing a single numerical value. + """ + if len(x.__dict__) != 1: + logging.warning( + "RobustZScore.learn_one expected univariate input, but got %s", + str(x)) + return + + v = next(iter(x)) + self._mad_tracker.push(v) + + def score_one(self, x: beam.Row) -> Optional[float]: + """Scores a data point using the Robust Z-Score. + + Args: + x: A `beam.Row` containing a single numerical value. + + Returns: + float | None: The Robust Z-Score. + """ + if len(x.__dict__) != 1: + logging.warning( + "RobustZScore.score_one expected univariate input, but got %s", + str(x)) + return None + + v = next(iter(x)) + if v is None or math.isnan(v): + return None + + median = self._mad_tracker.get_median() + mad = self._mad_tracker.get() + + # not enough data points to compute median or median absolute deviation + if math.isnan(mad) or math.isnan(median): + return float('NaN') + + if abs(mad) < EPSILON: + return 0.0 + + return abs(RobustZScore.SCALE_FACTOR * (v - median) / mad) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore_test.py new file mode 100644 index 000000000000..be2172602909 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/robust_zscore_test.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest + +import apache_beam as beam +from apache_beam.ml.anomaly.detectors.robust_zscore import RobustZScore + + +class RobustZScoreTest(unittest.TestCase): + input = [ + beam.Row(x=1), + beam.Row(x=1), + beam.Row(x=5), + beam.Row(x=7), + beam.Row(x=20), + beam.Row(x=6), + beam.Row(x=1) + ] + + def test_with_default_trackers(self): + zscore = RobustZScore() + + scores = [] + for row in RobustZScoreTest.input: + scores.append(zscore.score_one(row)) + zscore.learn_one(row) + + self.assertTrue(math.isnan(scores[0])) + self.assertEqual(scores[1:], [0.0, 0.0, 0.0, 5.73325, 0.168625, 1.349]) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/zscore.py b/sdks/python/apache_beam/ml/anomaly/detectors/zscore.py new file mode 100644 index 000000000000..a61c6108802b --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/zscore.py @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +from typing import Optional + +import apache_beam as beam +from apache_beam.ml.anomaly.base import AnomalyDetector +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.thresholds import FixedThreshold +from apache_beam.ml.anomaly.univariate.base import EPSILON +from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker +from apache_beam.ml.anomaly.univariate.mean import MeanTracker +from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import StdevTracker + +DEFAULT_WINDOW_SIZE = 1000 + + +# pylint: disable=line-too-long +@specifiable +class ZScore(AnomalyDetector): + """Z-Score anomaly detector. + + This class implements an anomaly detection algorithm based on Z-Score (also + known as Standard Score [#]_ ), which measures how many standard deviations a + data point is from the mean. + + The score is calculated as: `| (value - mean) / stdev |` + + Important: + In the streaming setting, we use the online version of mean and standard + deviation in the calculation. + + This implementation is adapted from the implementations within PySAD [#]_ and + River [#]_: + + * https://github.com/selimfirat/pysad/blob/master/pysad/models/standard_absolute_deviation.py + * https://github.com/online-ml/river/blob/main/river/anomaly/sad.py + + Args: + sub_stat_tracker: Optional `MeanTracker` instance. If None, an + `IncSlidingMeanTracker` with a default window size 1000 is created. + stdev_tracker: Optional `StdevTracker` instance. If None, an + `IncSlidingStdevTracker` with a default window size 1000 is created. + threshold_criterion: threshold_criterion: Optional `ThresholdFn` to apply on + the score. Defaults to `FixedThreshold(3)` due to the commonly used + 3-sigma rule. + **kwargs: Additional keyword arguments. + + .. [#] https://en.wikipedia.org/wiki/Standard_score + .. [#] Yilmaz, Selim & Kozat, Suleyman. (2020). PySAD: A Streaming Anomaly Detection Framework in Python. 10.48550/arXiv.2009.02572. + .. [#] Jacob Montiel, Max Halford, Saulo Martiello Mastelini, Geoffrey Bolmier, Raphaƫl Sourty, et al.. (2021). River: machine learning for streaming data in Python. Journal of Machine Learning Research, 2021, 22, pp.1-8. + """ + + # pylint: enable=line-too-long + def __init__( + self, + sub_stat_tracker: Optional[MeanTracker] = None, + stdev_tracker: Optional[StdevTracker] = None, + **kwargs): + if "threshold_criterion" not in kwargs: + kwargs["threshold_criterion"] = FixedThreshold(3) + super().__init__(**kwargs) + + self._sub_stat_tracker = sub_stat_tracker or IncSlidingMeanTracker( + DEFAULT_WINDOW_SIZE) + self._stdev_tracker = stdev_tracker or IncSlidingStdevTracker( + DEFAULT_WINDOW_SIZE) + + def learn_one(self, x: beam.Row) -> None: + """Updates the mean and standard deviation trackers with a new data point. + + Args: + x: A `beam.Row` containing a single numerical value. + """ + if len(x.__dict__) != 1: + logging.warning( + "ZScore.learn_one expected univariate input, but got %s", str(x)) + return + + v = next(iter(x)) + self._stdev_tracker.push(v) + self._sub_stat_tracker.push(v) + + def score_one(self, x: beam.Row) -> Optional[float]: + """Scores a data point using the Z-Score. + + Args: + x: A `beam.Row` containing a single numerical value. + + Returns: + float | None: The Z-Score. + """ + if len(x.__dict__) != 1: + logging.warning( + "ZScore.score_one expected univariate input, but got %s", str(x)) + return None + + v = next(iter(x)) + if v is None or math.isnan(v): + return None + + sub_stat = self._sub_stat_tracker.get() + stdev = self._stdev_tracker.get() + + # not enough data points to compute sub_stat or standard deviation + if math.isnan(stdev) or math.isnan(sub_stat): + return float('NaN') + + if abs(stdev) < EPSILON: + return 0.0 + + return abs((v - sub_stat) / stdev) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/zscore_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/zscore_test.py new file mode 100644 index 000000000000..830d57b25c44 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/zscore_test.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest + +import apache_beam as beam +from apache_beam.ml.anomaly.detectors.zscore import ZScore +from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker +from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker + + +class ZScoreTest(unittest.TestCase): + input = [ + beam.Row(x=1), + beam.Row(x=1), + beam.Row(x=5), + beam.Row(x=9), + beam.Row(x=20), + beam.Row(x=10), + beam.Row(x=1) + ] + + def test_with_default_trackers(self): + zscore = ZScore() + + scores = [] + for row in ZScoreTest.input: + scores.append(zscore.score_one(row)) + zscore.learn_one(row) + + self.assertTrue(math.isnan(scores[0])) + self.assertTrue(math.isnan(scores[1])) + self.assertEqual( + scores[2:], + [ + 0.0, + 2.8867513459481287, + 4.177863742936748, + 0.35502819053868157, + 0.932910509720565 + ]) + + def test_with_custom_mean_tracker(self): + zscore = ZScore( + sub_stat_tracker=IncSlidingMeanTracker(3), + stdev_tracker=IncSlidingStdevTracker(3)) + + scores = [] + for row in ZScoreTest.input: + scores.append(zscore.score_one(row)) + zscore.learn_one(row) + + self.assertTrue(math.isnan(scores[0])) + self.assertTrue(math.isnan(scores[1])) + self.assertEqual( + scores[2:], [ + 0.0, + 2.8867513459481287, + 3.75, + 0.17165643016914964, + 1.9727878476642864 + ]) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py b/sdks/python/apache_beam/ml/anomaly/specifiable.py index e0122d41d9d5..e73ef5513b64 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py @@ -123,7 +123,15 @@ def _from_spec_helper(v, _run_init): @classmethod def from_spec(cls, spec: Spec, _run_init: bool = True) -> Self: - """Generate a `Specifiable` subclass object based on a spec.""" + """Generate a `Specifiable` subclass object based on a spec. + + Args: + spec: the specification of a `Specifiable` subclass object + _run_init: whether to call `__init__` or not for the initial instantiation + + Returns: + Self: the `Specifiable` subclass object + """ if spec.type is None: raise ValueError(f"Spec type not found in {spec}") @@ -153,7 +161,11 @@ def _to_spec_helper(v): return v def to_spec(self) -> Spec: - """Generate a spec from a `Specifiable` subclass object.""" + """Generate a spec from a `Specifiable` subclass object. + + Returns: + Spec: The specification of the instance. + """ if getattr(type(self), 'spec_type', None) is None: raise ValueError( f"'{type(self).__name__}' not registered as Specifiable. " @@ -262,7 +274,14 @@ def new_init(self: Specifiable, *args, **kwargs): original_init(self, *args, **kwargs) self._initialized = True - def run_original_init(self): + def run_original_init(self) -> None: + """Execute the original `__init__` method with its saved arguments. + + For instances of the `Specifiable` class, initialization is deferred + (lazy initialization). This function forces the execution of the + original `__init__` method using the arguments captured during + the object's initial instantiation. + """ self._in_init = True original_init(self, **self.init_kwargs) self._in_init = False diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds.py b/sdks/python/apache_beam/ml/anomaly/thresholds.py index d777aa5cde00..8226a27b7c4e 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds.py @@ -17,172 +17,13 @@ from __future__ import annotations -import dataclasses import math -from typing import Any -from typing import Iterable from typing import Optional -from typing import Tuple -from typing import Union -import apache_beam as beam -from apache_beam.coders import DillCoder -from apache_beam.ml.anomaly.base import AnomalyResult from apache_beam.ml.anomaly.base import ThresholdFn -from apache_beam.ml.anomaly.specifiable import Spec -from apache_beam.ml.anomaly.specifiable import Specifiable from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker -from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState -from apache_beam.transforms.userstate import ReadModifyWriteStateSpec - - -class BaseThresholdDoFn(beam.DoFn): - """Applies a ThresholdFn to anomaly detection results. - - This abstract base class defines the structure for DoFns that use a - `ThresholdFn` to convert anomaly scores into anomaly labels (e.g., normal - or outlier). It handles the core logic of applying the threshold function - and updating the prediction labels within `AnomalyResult` objects. - - Args: - threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be - used. - """ - def __init__(self, threshold_fn_spec: Spec): - self._threshold_fn_spec = threshold_fn_spec - self._threshold_fn = None - - def _apply_threshold_to_predictions( - self, result: AnomalyResult) -> AnomalyResult: - """Updates the prediction labels in an AnomalyResult using the ThresholdFn. - - Args: - result (AnomalyResult): The input `AnomalyResult` containing anomaly - scores. - - Returns: - AnomalyResult: A new `AnomalyResult` with updated prediction labels - and threshold values. - """ - predictions = [ - dataclasses.replace( - p, - label=self._threshold_fn.apply(p.score), - threshold=self._threshold_fn.threshold) for p in result.predictions - ] - return dataclasses.replace(result, predictions=predictions) - - -class StatelessThresholdDoFn(BaseThresholdDoFn): - """Applies a stateless ThresholdFn to anomaly detection results. - - This DoFn is designed for stateless `ThresholdFn` implementations. It - initializes the `ThresholdFn` once during setup and applies it to each - incoming element without maintaining any state across elements. - - Args: - threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be - used. - - Raises: - AssertionError: If the provided `threshold_fn_spec` leads to the - creation of a stateful `ThresholdFn`. - """ - def __init__(self, threshold_fn_spec: Spec): - threshold_fn_spec.config["_run_init"] = True - self._threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) - assert not self._threshold_fn.is_stateful, \ - "This DoFn can only take stateless function as threshold_fn" - - def process(self, element: Tuple[Any, Tuple[Any, AnomalyResult]], - **kwargs) -> Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: - """Processes a batch of anomaly results using a stateless ThresholdFn. - - Args: - element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing - an element in the Beam pipeline. It is expected to be in the format - `(key1, (key2, AnomalyResult))`, where key1 is the original input key, - and key2 is a disambiguating key for distinct data points. - **kwargs: Additional keyword arguments passed to the `process` method - in Beam DoFns. - - Yields: - Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing - a single output element with the same structure as the input, but with - the `AnomalyResult` having updated prediction labels based on the - stateless `ThresholdFn`. - """ - k1, (k2, result) = element - yield k1, (k2, self._apply_threshold_to_predictions(result)) - - -class StatefulThresholdDoFn(BaseThresholdDoFn): - """Applies a stateful ThresholdFn to anomaly detection results. - - This DoFn is designed for stateful `ThresholdFn` implementations. It leverages - Beam's state management to persist and update the state of the `ThresholdFn` - across multiple elements. This is necessary for `ThresholdFn`s that need to - accumulate information or adapt over time, such as quantile-based thresholds. - - Args: - threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be - used. - - Raises: - AssertionError: If the provided `threshold_fn_spec` leads to the - creation of a stateless `ThresholdFn`. - """ - THRESHOLD_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', DillCoder()) - - def __init__(self, threshold_fn_spec: Spec): - threshold_fn_spec.config["_run_init"] = True - threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) - assert threshold_fn.is_stateful, \ - "This DoFn can only take stateful function as threshold_fn" - self._threshold_fn_spec = threshold_fn_spec - - def process( - self, - element: Tuple[Any, Tuple[Any, AnomalyResult]], - threshold_state: Union[ReadModifyWriteRuntimeState, - Any] = beam.DoFn.StateParam(THRESHOLD_STATE_INDEX), - **kwargs) -> Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: - """Processes a batch of anomaly results using a stateful ThresholdFn. - - For each input element, this DoFn retrieves the stateful `ThresholdFn` from - Beam state, initializes it if it's the first time, applies it to update - the prediction labels in the `AnomalyResult`, and then updates the state in - Beam for future elements. - - Args: - element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing - an element in the Beam pipeline. It is expected to be in the format - `(key1, (key2, AnomalyResult))`, where key1 is the original input key, - and key2 is a disambiguating key for distinct data points. - threshold_state (Union[ReadModifyWriteRuntimeState, Any]): A Beam state - parameter that provides access to the persisted state of the - `ThresholdFn`. It is automatically managed by Beam. - **kwargs: Additional keyword arguments passed to the `process` method - in Beam DoFns. - - Yields: - Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing - a single output element with the same structure as the input, but - with the `AnomalyResult` having updated prediction labels based on - the stateful `ThresholdFn`. - """ - k1, (k2, result) = element - - self._threshold_fn = threshold_state.read() - if self._threshold_fn is None: - self._threshold_fn: Specifiable = Specifiable.from_spec( - self._threshold_fn_spec) - - yield k1, (k2, self._apply_threshold_to_predictions(result)) - - threshold_state.write(self._threshold_fn) @specifiable diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py index 413c0e52c6b0..bd2629ee00c1 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py @@ -18,18 +18,10 @@ import logging import unittest -import apache_beam as beam from apache_beam.ml.anomaly import thresholds -from apache_beam.ml.anomaly.base import AnomalyPrediction -from apache_beam.ml.anomaly.base import AnomalyResult from apache_beam.ml.anomaly.specifiable import Spec from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to - -R = beam.Row(x=10, y=20) class TestFixedThreshold(unittest.TestCase): @@ -40,93 +32,6 @@ def test_apply_only(self): self.assertEqual(threshold_fn.apply(None), None) self.assertEqual(threshold_fn.apply(float('NaN')), -2) - def test_dofn_on_single_prediction(self): - input = [ - (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), - (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), - (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), - ] - expected = [ - ( - 1, - ( - 2, - AnomalyResult( - R, [AnomalyPrediction(score=1, label=0, threshold=2)]))), - ( - 1, - ( - 3, - AnomalyResult( - R, [AnomalyPrediction(score=2, label=1, threshold=2)]))), - ( - 1, - ( - 4, - AnomalyResult( - R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), - ] - with TestPipeline() as p: - result = ( - p - | beam.Create(input) - | beam.ParDo( - thresholds.StatelessThresholdDoFn( - thresholds.FixedThreshold(2, normal_label=0, - outlier_label=1).to_spec()))) - assert_that(result, equal_to(expected)) - - def test_dofn_on_multiple_predictions(self): - input = [ - ( - 1, - ( - 2, - AnomalyResult( - R, - [AnomalyPrediction(score=1), AnomalyPrediction(score=4)]))), - ( - 1, - ( - 3, - AnomalyResult( - R, - [AnomalyPrediction(score=2), AnomalyPrediction(score=0.5) - ]))), - ] - expected = [ - ( - 1, - ( - 2, - AnomalyResult( - R, - [ - AnomalyPrediction(score=1, label=0, threshold=2), - AnomalyPrediction(score=4, label=1, threshold=2) - ]))), - ( - 1, - ( - 3, - AnomalyResult( - R, - [ - AnomalyPrediction(score=2, label=1, threshold=2), - AnomalyPrediction(score=0.5, label=0, threshold=2) - ]))), - ] - with TestPipeline() as p: - result = ( - p - | beam.Create(input) - | beam.ParDo( - thresholds.StatelessThresholdDoFn( - thresholds.FixedThreshold(2, normal_label=0, - outlier_label=1).to_spec()))) - - assert_that(result, equal_to(expected)) - class TestQuantileThreshold(unittest.TestCase): def test_apply_only(self): @@ -137,67 +42,6 @@ def test_apply_only(self): self.assertEqual(threshold_fn.apply(None), None) self.assertEqual(threshold_fn.apply(float('NaN')), -2) - def test_dofn_on_single_prediction(self): - # use the input data with two keys to test stateful threshold function - input = [ - (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), - (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), - (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), - (2, (2, AnomalyResult(R, [AnomalyPrediction(score=10)]))), - (2, (3, AnomalyResult(R, [AnomalyPrediction(score=20)]))), - (2, (4, AnomalyResult(R, [AnomalyPrediction(score=30)]))), - ] - expected = [ - ( - 1, - ( - 2, - AnomalyResult( - R, [AnomalyPrediction(score=1, label=1, threshold=1)]))), - ( - 1, - ( - 3, - AnomalyResult( - R, [AnomalyPrediction(score=2, label=1, threshold=1.5)]))), - ( - 2, - ( - 2, - AnomalyResult( - R, [AnomalyPrediction(score=10, label=1, threshold=10)]))), - ( - 2, - ( - 3, - AnomalyResult( - R, [AnomalyPrediction(score=20, label=1, threshold=15)]))), - ( - 1, - ( - 4, - AnomalyResult( - R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), - ( - 2, - ( - 4, - AnomalyResult( - R, [AnomalyPrediction(score=30, label=1, threshold=20)]))), - ] - with TestPipeline() as p: - result = ( - p - | beam.Create(input) - # use median just for test convenience - | beam.ParDo( - thresholds.StatefulThresholdDoFn( - thresholds.QuantileThreshold( - quantile=0.5, normal_label=0, - outlier_label=1).to_spec()))) - - assert_that(result, equal_to(expected)) - def test_quantile_tracker(self): t1 = thresholds.QuantileThreshold() self.assertTrue(isinstance(t1._tracker, BufferedSlidingQuantileTracker)) @@ -215,7 +59,6 @@ def test_quantile_tracker(self): quantile=0.9, quantile_tracker=SimpleSlidingQuantileTracker(50, 0.975)) self.assertTrue(isinstance(t3._tracker, SimpleSlidingQuantileTracker)) self.assertEqual(t3._tracker._q, 0.975) - print(t3.to_spec()) self.assertEqual( t3.to_spec(), Spec( diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py new file mode 100644 index 000000000000..5fcb2208d780 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/transforms.py @@ -0,0 +1,528 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import dataclasses +import typing +import uuid +from typing import Any +from typing import Callable +from typing import Iterable +from typing import Tuple +from typing import TypeVar +from typing import Union + +import apache_beam as beam +from apache_beam.coders import DillCoder +from apache_beam.ml.anomaly import aggregations +from apache_beam.ml.anomaly.base import AggregationFn +from apache_beam.ml.anomaly.base import AnomalyDetector +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly.base import AnomalyResult +from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector +from apache_beam.ml.anomaly.base import ThresholdFn +from apache_beam.ml.anomaly.specifiable import Spec +from apache_beam.ml.anomaly.specifiable import Specifiable +from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState +from apache_beam.transforms.userstate import ReadModifyWriteStateSpec + +KeyT = TypeVar('KeyT') +TempKeyT = TypeVar('TempKeyT', bound=int) +InputT = Tuple[KeyT, beam.Row] +KeyedInputT = Tuple[KeyT, Tuple[TempKeyT, beam.Row]] +KeyedOutputT = Tuple[KeyT, Tuple[TempKeyT, AnomalyResult]] +OutputT = Tuple[KeyT, AnomalyResult] + + +class _ScoreAndLearnDoFn(beam.DoFn): + """Scores and learns from incoming data using an anomaly detection model. + + This DoFn applies an anomaly detection model to score incoming data and + then updates the model with the same data. It maintains the model state + using Beam's state management. + """ + MODEL_STATE_INDEX = ReadModifyWriteStateSpec('saved_model', DillCoder()) + + def __init__(self, detector_spec: Spec): + self._detector_spec = detector_spec + self._detector_spec.config["_run_init"] = True + + def score_and_learn(self, data): + """Scores and learns from a single data point. + + Args: + data: A `beam.Row` representing the input data point. + + Returns: + float: The anomaly score predicted by the model. + """ + assert self._underlying + if self._underlying._features is not None: + x = beam.Row(**{f: getattr(data, f) for f in self._underlying._features}) + else: + x = beam.Row(**data._asdict()) + + # score the incoming data using the existing model + y_pred = self._underlying.score_one(x) + + # then update the model with the same data + self._underlying.learn_one(x) + + return y_pred + + def process( + self, + element: KeyedInputT, + model_state=beam.DoFn.StateParam(MODEL_STATE_INDEX), + **kwargs) -> Iterable[KeyedOutputT]: + + model_state = typing.cast(ReadModifyWriteRuntimeState, model_state) + k1, (k2, data) = element + self._underlying: AnomalyDetector = model_state.read() + if self._underlying is None: + self._underlying = typing.cast( + AnomalyDetector, Specifiable.from_spec(self._detector_spec)) + + yield k1, (k2, + AnomalyResult( + example=data, + predictions=[AnomalyPrediction( + model_id=self._underlying._model_id, + score=self.score_and_learn(data))])) + + model_state.write(self._underlying) + + +class RunScoreAndLearn(beam.PTransform[beam.PCollection[KeyedInputT], + beam.PCollection[KeyedOutputT]]): + """Applies the _ScoreAndLearnDoFn to a PCollection of data. + + This PTransform scores and learns from data points using an anomaly + detection model. + + Args: + detector: The anomaly detection model to use. + """ + def __init__(self, detector: AnomalyDetector): + self._detector = detector + + def expand( + self, + input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]: + return input | beam.ParDo(_ScoreAndLearnDoFn(self._detector.to_spec())) + + +class _BaseThresholdDoFn(beam.DoFn): + """Applies a ThresholdFn to anomaly detection results. + + This abstract base class defines the structure for DoFns that use a + `ThresholdFn` to convert anomaly scores into anomaly labels (e.g., normal + or outlier). It handles the core logic of applying the threshold function + and updating the prediction labels within `AnomalyResult` objects. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + """ + def __init__(self, threshold_fn_spec: Spec): + self._threshold_fn_spec = threshold_fn_spec + self._threshold_fn: ThresholdFn + + def _apply_threshold_to_predictions( + self, result: AnomalyResult) -> AnomalyResult: + """Updates the prediction labels in an AnomalyResult using the ThresholdFn. + + Args: + result (AnomalyResult): The input `AnomalyResult` containing anomaly + scores. + + Returns: + AnomalyResult: A new `AnomalyResult` with updated prediction labels + and threshold values. + """ + predictions = [ + dataclasses.replace( + p, + label=self._threshold_fn.apply(p.score), + threshold=self._threshold_fn.threshold) for p in result.predictions + ] + return dataclasses.replace(result, predictions=predictions) + + +class _StatelessThresholdDoFn(_BaseThresholdDoFn): + """Applies a stateless ThresholdFn to anomaly detection results. + + This DoFn is designed for stateless `ThresholdFn` implementations. It + initializes the `ThresholdFn` once during setup and applies it to each + incoming element without maintaining any state across elements. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + + Raises: + AssertionError: If the provided `threshold_fn_spec` leads to the + creation of a stateful `ThresholdFn`. + """ + def __init__(self, threshold_fn_spec: Spec): + threshold_fn_spec.config["_run_init"] = True + self._threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) + assert not self._threshold_fn.is_stateful, \ + "This DoFn can only take stateless function as threshold_fn" + + def process(self, element: KeyedOutputT, **kwargs) -> Iterable[KeyedOutputT]: + """Processes a batch of anomaly results using a stateless ThresholdFn. + + Args: + element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing + an element in the Beam pipeline. It is expected to be in the format + `(key1, (key2, AnomalyResult))`, where key1 is the original input key, + and key2 is a disambiguating key for distinct data points. + **kwargs: Additional keyword arguments passed to the `process` method + in Beam DoFns. + + Yields: + Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing + a single output element with the same structure as the input, but with + the `AnomalyResult` having updated prediction labels based on the + stateless `ThresholdFn`. + """ + k1, (k2, result) = element + yield k1, (k2, self._apply_threshold_to_predictions(result)) + + +class _StatefulThresholdDoFn(_BaseThresholdDoFn): + """Applies a stateful ThresholdFn to anomaly detection results. + + This DoFn is designed for stateful `ThresholdFn` implementations. It leverages + Beam's state management to persist and update the state of the `ThresholdFn` + across multiple elements. This is necessary for `ThresholdFn`s that need to + accumulate information or adapt over time, such as quantile-based thresholds. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + + Raises: + AssertionError: If the provided `threshold_fn_spec` leads to the + creation of a stateless `ThresholdFn`. + """ + THRESHOLD_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', DillCoder()) + + def __init__(self, threshold_fn_spec: Spec): + threshold_fn_spec.config["_run_init"] = True + threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) + assert threshold_fn.is_stateful, \ + "This DoFn can only take stateful function as threshold_fn" + self._threshold_fn_spec = threshold_fn_spec + + def process( + self, + element: KeyedOutputT, + threshold_state: Union[ReadModifyWriteRuntimeState, + Any] = beam.DoFn.StateParam(THRESHOLD_STATE_INDEX), + **kwargs) -> Iterable[KeyedOutputT]: + """Processes a batch of anomaly results using a stateful ThresholdFn. + + For each input element, this DoFn retrieves the stateful `ThresholdFn` from + Beam state, initializes it if it's the first time, applies it to update + the prediction labels in the `AnomalyResult`, and then updates the state in + Beam for future elements. + + Args: + element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing + an element in the Beam pipeline. It is expected to be in the format + `(key1, (key2, AnomalyResult))`, where key1 is the original input key, + and key2 is a disambiguating key for distinct data points. + threshold_state (Union[ReadModifyWriteRuntimeState, Any]): A Beam state + parameter that provides access to the persisted state of the + `ThresholdFn`. It is automatically managed by Beam. + **kwargs: Additional keyword arguments passed to the `process` method + in Beam DoFns. + + Yields: + Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing + a single output element with the same structure as the input, but + with the `AnomalyResult` having updated prediction labels based on + the stateful `ThresholdFn`. + """ + k1, (k2, result) = element + + self._threshold_fn = threshold_state.read() + if self._threshold_fn is None: + self._threshold_fn: Specifiable = Specifiable.from_spec( + self._threshold_fn_spec) + + yield k1, (k2, self._apply_threshold_to_predictions(result)) + + threshold_state.write(self._threshold_fn) + + +class RunThresholdCriterion(beam.PTransform[beam.PCollection[KeyedOutputT], + beam.PCollection[KeyedOutputT]]): + """Applies a threshold criterion to anomaly detection results. + + This PTransform applies a `ThresholdFn` to the anomaly scores in + `AnomalyResult` objects, updating the prediction labels. It handles both + stateful and stateless `ThresholdFn` implementations. + + Args: + threshold_criterion: The `ThresholdFn` to apply. + """ + def __init__(self, threshold_criterion): + self._threshold_fn = threshold_criterion + + def expand( + self, + input: beam.PCollection[KeyedOutputT]) -> beam.PCollection[KeyedOutputT]: + if self._threshold_fn: + if self._threshold_fn.is_stateful: + ret = ( + input + | beam.ParDo(_StatefulThresholdDoFn(self._threshold_fn.to_spec()))) + else: + ret = ( + input + | beam.ParDo(_StatelessThresholdDoFn(self._threshold_fn.to_spec()))) + else: + ret = input + + return ret + + +class RunAggregationStrategy(beam.PTransform[beam.PCollection[KeyedOutputT], + beam.PCollection[KeyedOutputT]]): + """Applies an aggregation strategy to grouped anomaly detection results. + + This PTransform aggregates anomaly predictions from multiple models or + data points using an `AggregationFn`. It handles both custom and simple + aggregation strategies. + + Args: + aggregation_strategy: The `AggregationFn` to use. + agg_model_id: The model ID for aggregation. + """ + def __init__(self, aggregation_strategy, agg_model_id): + self._aggregation_fn = aggregation_strategy + self._agg_model_id = agg_model_id + + def expand( + self, + input: beam.PCollection[KeyedOutputT]) -> beam.PCollection[KeyedOutputT]: + post_gbk = ( + input | beam.MapTuple(lambda k, v: ((k, v[0]), v[1])) + | beam.GroupByKey()) + + if self._aggregation_fn is None: + # simply put predictions into an iterable (list) + ret: Any = ( + post_gbk | beam.MapTuple( + lambda k, + v: ( + k[0], + ( + k[1], + AnomalyResult( + example=v[0].example, + predictions=[ + prediction for result in v + for prediction in result.predictions + ]))))) + return ret + + # create a new aggregation_fn from spec and make sure it is initialized + aggregation_fn_spec = self._aggregation_fn.to_spec() + aggregation_fn_spec.config["_run_init"] = True + aggregation_fn: AggregationFn = typing.cast( + AggregationFn, Specifiable.from_spec(aggregation_fn_spec)) + + # if no _agg_model_id is set in the aggregation function, use + # model id from the ensemble instance + if (isinstance(aggregation_fn, aggregations._AggModelIdMixin)): + aggregation_fn._set_agg_model_id_if_unset(self._agg_model_id) + + ret = ( + post_gbk | beam.MapTuple( + lambda k, + v, + agg=aggregation_fn: ( + k[0], + ( + k[1], + AnomalyResult( + example=v[0].example, + predictions=[ + agg.apply([ + prediction for result in v + for prediction in result.predictions + ]) + ]))))) + return ret + + +class RunOneDetector(beam.PTransform[beam.PCollection[KeyedInputT], + beam.PCollection[KeyedOutputT]]): + """Runs a single anomaly detector on a PCollection of data. + + This PTransform applies a single `AnomalyDetector` to the input data, + including scoring, learning, and thresholding. + + Args: + detector: The `AnomalyDetector` to run. + """ + def __init__(self, detector): + self._detector = detector + + def expand( + self, + input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]: + model_id = getattr( + self._detector, + "_model_id", + getattr(self._detector, "_key", "unknown_model")) + model_uuid = f"{model_id}:{uuid.uuid4().hex[:6]}" + + ret: Any = ( + input + | beam.Reshuffle() + | f"Score and Learn ({model_uuid})" >> RunScoreAndLearn(self._detector) + | f"Run Threshold Criterion ({model_uuid})" >> RunThresholdCriterion( + self._detector._threshold_criterion)) + + return ret + + +class RunEnsembleDetector(beam.PTransform[beam.PCollection[KeyedInputT], + beam.PCollection[KeyedOutputT]]): + """Runs an ensemble of anomaly detectors on a PCollection of data. + + This PTransform applies an `EnsembleAnomalyDetector` to the input data, + running each sub-detector and aggregating the results. + + Args: + ensemble_detector: The `EnsembleAnomalyDetector` to run. + """ + def __init__(self, ensemble_detector: EnsembleAnomalyDetector): + self._ensemble_detector = ensemble_detector + + def expand( + self, + input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]: + model_uuid = f"{self._ensemble_detector._model_id}:{uuid.uuid4().hex[:6]}" + + assert self._ensemble_detector._sub_detectors is not None + if not self._ensemble_detector._sub_detectors: + raise ValueError(f"No detectors found at {model_uuid}") + + results = [] + for idx, detector in enumerate(self._ensemble_detector._sub_detectors): + if isinstance(detector, EnsembleAnomalyDetector): + results.append( + input | f"Run Ensemble Detector at index {idx} ({model_uuid})" >> + RunEnsembleDetector(detector)) + else: + results.append( + input + | f"Run One Detector at index {idx} ({model_uuid})" >> + RunOneDetector(detector)) + + if self._ensemble_detector._aggregation_strategy is None: + aggregation_type = "Simple" + else: + aggregation_type = "Custom" + + aggregated = ( + results | beam.Flatten() + | f"Run {aggregation_type} Aggregation Strategy ({model_uuid})" >> + RunAggregationStrategy( + self._ensemble_detector._aggregation_strategy, + self._ensemble_detector._model_id)) + + ret: Any = ( + aggregated + | f"Run Threshold Criterion ({model_uuid})" >> RunThresholdCriterion( + self._ensemble_detector._threshold_criterion)) + + return ret + + +class AnomalyDetection(beam.PTransform[beam.PCollection[InputT], + beam.PCollection[OutputT]]): + """Performs anomaly detection on a PCollection of data. + + This PTransform applies an `AnomalyDetector` or `EnsembleAnomalyDetector` to + the input data and returns a PCollection of `AnomalyResult` objects. + + Examples:: + + # Run a single anomaly detector + p | AnomalyDetection(ZScore(features=["x1"])) + + # Run an ensemble anomaly detector + sub_detectors = [ZScore(features=["x1"]), IQR(features=["x2"])] + p | AnomalyDetection( + EnsembleAnomalyDetector(sub_detectors, aggregation_strategy=AnyVote())) + + Args: + detector: The `AnomalyDetector` or `EnsembleAnomalyDetector` to use. + """ + def __init__( + self, + detector: AnomalyDetector, + ) -> None: + self._root_detector = detector + + def expand( + self, + input: beam.PCollection[InputT], + ) -> beam.PCollection[OutputT]: + + # Add a temporary unique key per data point to facilitate grouping the + # outputs from multiple anomaly detectors for the same data point. + # + # Unique key generation options: + # (1) Timestamp-based methods: https://docs.python.org/3/library/time.html + # (2) UUID module: https://docs.python.org/3/library/uuid.html + # + # Timestamp precision on Windows can lead to key collisions (see PEP 564: + # https://peps.python.org/pep-0564/#windows). Only time.perf_counter_ns() + # provides sufficient precision for our needs. + # + # Performance note: + # $ python -m timeit -n 100000 "import uuid; uuid.uuid1()" + # 100000 loops, best of 5: 806 nsec per loop + # $ python -m timeit -n 100000 "import uuid; uuid.uuid4()" + # 100000 loops, best of 5: 1.53 usec per loop + # $ python -m timeit -n 100000 "import time; time.perf_counter_ns()" + # 100000 loops, best of 5: 82.3 nsec per loop + # + # We select uuid.uuid1() for its inclusion of node information, making it + # more suitable for parallel execution environments. + add_temp_key_fn: Callable[[InputT], KeyedInputT] \ + = lambda e: (e[0], (str(uuid.uuid1()), e[1])) + keyed_input = (input | "Add temp key" >> beam.Map(add_temp_key_fn)) + + if isinstance(self._root_detector, EnsembleAnomalyDetector): + keyed_output = (keyed_input | RunEnsembleDetector(self._root_detector)) + else: + keyed_output = (keyed_input | RunOneDetector(self._root_detector)) + + # remove the temporary key and simplify the output. + remove_temp_key_fn: Callable[[KeyedOutputT], OutputT] \ + = lambda e: (e[0], e[1][1]) + ret: Any = keyed_output | "Remove temp key" >> beam.Map(remove_temp_key_fn) + + return ret diff --git a/sdks/python/apache_beam/ml/anomaly/transforms_test.py b/sdks/python/apache_beam/ml/anomaly/transforms_test.py new file mode 100644 index 000000000000..cf398728f372 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/transforms_test.py @@ -0,0 +1,413 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest +from typing import Iterable + +import apache_beam as beam +from apache_beam.ml.anomaly.aggregations import AnyVote +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly.base import AnomalyResult +from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector +from apache_beam.ml.anomaly.detectors.zscore import ZScore +from apache_beam.ml.anomaly.thresholds import FixedThreshold +from apache_beam.ml.anomaly.thresholds import QuantileThreshold +from apache_beam.ml.anomaly.transforms import AnomalyDetection +from apache_beam.ml.anomaly.transforms import _StatefulThresholdDoFn +from apache_beam.ml.anomaly.transforms import _StatelessThresholdDoFn +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +def _prediction_iterable_is_equal_to( + a: Iterable[AnomalyPrediction], b: Iterable[AnomalyPrediction]): + a_list = list(a) + b_list = list(b) + + if len(a_list) != len(b_list): + return False + + return any( + map(lambda x: _prediction_is_equal_to(x[0], x[1]), zip(a_list, b_list))) + + +def _prediction_is_equal_to(a: AnomalyPrediction, b: AnomalyPrediction): + if a.model_id != b.model_id: + return False + + if a.threshold != b.threshold: + return False + + if a.score != b.score: + if not (a.score is not None and b.score is not None and + math.isnan(a.score) and math.isnan(b.score)): + return False + + if a.label != b.label: + return False + + if a.info != b.info: + return False + + if a.source_predictions is None and b.source_predictions is None: + return True + + if a.source_predictions is not None and b.source_predictions is not None: + return _prediction_iterable_is_equal_to( + a.source_predictions, b.source_predictions) + + return False + + +def _keyed_result_is_equal_to( + a: tuple[int, AnomalyResult], b: tuple[int, AnomalyResult]): + if a[0] != b[0]: + return False + + if a[1].example != b[1].example: + return False + + return _prediction_iterable_is_equal_to(a[1].predictions, b[1].predictions) + + +class TestAnomalyDetection(unittest.TestCase): + def setUp(self): + self._input = [ + (1, beam.Row(x1=1, x2=4)), + (2, beam.Row(x1=100, x2=5)), # an row with a different key (key=2) + (1, beam.Row(x1=2, x2=4)), + (1, beam.Row(x1=3, x2=5)), + (1, beam.Row(x1=10, x2=4)), # outlier in key=1, with respect to x1 + (1, beam.Row(x1=2, x2=10)), # outlier in key=1, with respect to x2 + (1, beam.Row(x1=3, x2=4)), + ] + + def test_one_detector(self): + zscore_x1_expected = [ + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=2.1213203435596424, + label=0, + threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=8.0, label=1, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=0.4898979485566356, + label=0, + threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=0.16452254913212455, + label=0, + threshold=3), + ] + detector = ZScore(features=["x1"], model_id="zscore_x1") + + with TestPipeline() as p: + result = ( + p | beam.Create(self._input) + # TODO: get rid of this conversion between BeamSchema to beam.Row. + | beam.Map(lambda t: (t[0], beam.Row(**t[1]._asdict()))) + | AnomalyDetection(detector)) + assert_that( + result, + equal_to([( + input[0], AnomalyResult(example=input[1], predictions=[decision])) + for input, + decision in zip(self._input, zscore_x1_expected)], + _keyed_result_is_equal_to)) + + def test_multiple_detectors_without_aggregation(self): + zscore_x1_expected = [ + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=2.1213203435596424, + label=0, + threshold=3), + AnomalyPrediction( + model_id='zscore_x1', score=8.0, label=1, threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=0.4898979485566356, + label=0, + threshold=3), + AnomalyPrediction( + model_id='zscore_x1', + score=0.16452254913212455, + label=0, + threshold=3), + ] + zscore_x2_expected = [ + AnomalyPrediction( + model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2), + AnomalyPrediction( + model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2), + AnomalyPrediction( + model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2), + AnomalyPrediction(model_id='zscore_x2', score=0, label=0, threshold=2), + AnomalyPrediction( + model_id='zscore_x2', + score=0.5773502691896252, + label=0, + threshold=2), + AnomalyPrediction( + model_id='zscore_x2', score=11.5, label=1, threshold=2), + AnomalyPrediction( + model_id='zscore_x2', + score=0.5368754921931594, + label=0, + threshold=2), + ] + + sub_detectors = [] + sub_detectors.append(ZScore(features=["x1"], model_id="zscore_x1")) + sub_detectors.append( + ZScore( + features=["x2"], + threshold_criterion=FixedThreshold(2), + model_id="zscore_x2")) + + with beam.Pipeline() as p: + result = ( + p | beam.Create(self._input) + # TODO: get rid of this conversion between BeamSchema to beam.Row. + | beam.Map(lambda t: (t[0], beam.Row(**t[1]._asdict()))) + | AnomalyDetection(EnsembleAnomalyDetector(sub_detectors))) + + assert_that( + result, + equal_to([( + input[0], + AnomalyResult( + example=input[1], predictions=[decision1, decision2])) + for input, + decision1, + decision2 in zip( + self._input, zscore_x1_expected, zscore_x2_expected)], + _keyed_result_is_equal_to)) + + def test_multiple_sub_detectors_with_aggregation(self): + aggregated = [ + AnomalyPrediction(model_id="custom", label=-2), + AnomalyPrediction(model_id="custom", label=-2), + AnomalyPrediction(model_id="custom", label=-2), + AnomalyPrediction(model_id="custom", label=0), + AnomalyPrediction(model_id="custom", label=1), + AnomalyPrediction(model_id="custom", label=1), + AnomalyPrediction(model_id="custom", label=0), + ] + + sub_detectors = [] + sub_detectors.append(ZScore(features=["x1"], model_id="zscore_x1")) + sub_detectors.append( + ZScore( + features=["x2"], + threshold_criterion=FixedThreshold(2), + model_id="zscore_x2")) + + with beam.Pipeline() as p: + result = ( + p | beam.Create(self._input) + # TODO: get rid of this conversion between BeamSchema to beam.Row. + | beam.Map(lambda t: (t[0], beam.Row(**t[1]._asdict()))) + | AnomalyDetection( + EnsembleAnomalyDetector( + sub_detectors, aggregation_strategy=AnyVote()))) + + assert_that( + result, + equal_to([( + input[0], + AnomalyResult(example=input[1], predictions=[prediction])) + for input, + prediction in zip(self._input, aggregated)])) + + +R = beam.Row(x=10, y=20) + + +class TestStatelessThresholdDoFn(unittest.TestCase): + def test_dofn_on_single_prediction(self): + input = [ + (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + R, [AnomalyPrediction(score=1, label=0, threshold=2)]))), + ( + 1, + ( + 3, + AnomalyResult( + R, [AnomalyPrediction(score=2, label=1, threshold=2)]))), + ( + 1, + ( + 4, + AnomalyResult( + R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + | beam.ParDo( + _StatelessThresholdDoFn( + FixedThreshold(2, normal_label=0, + outlier_label=1).to_spec()))) + assert_that(result, equal_to(expected)) + + def test_dofn_on_multiple_predictions(self): + input = [ + ( + 1, + ( + 2, + AnomalyResult( + R, + [AnomalyPrediction(score=1), AnomalyPrediction(score=4)]))), + ( + 1, + ( + 3, + AnomalyResult( + R, + [AnomalyPrediction(score=2), AnomalyPrediction(score=0.5) + ]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + R, + [ + AnomalyPrediction(score=1, label=0, threshold=2), + AnomalyPrediction(score=4, label=1, threshold=2) + ]))), + ( + 1, + ( + 3, + AnomalyResult( + R, + [ + AnomalyPrediction(score=2, label=1, threshold=2), + AnomalyPrediction(score=0.5, label=0, threshold=2) + ]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + | beam.ParDo( + _StatelessThresholdDoFn( + FixedThreshold(2, normal_label=0, + outlier_label=1).to_spec()))) + + assert_that(result, equal_to(expected)) + + +class TestStatefulThresholdDoFn(unittest.TestCase): + def test_dofn_on_single_prediction(self): + # use the input data with two keys to test stateful threshold function + input = [ + (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), + (2, (2, AnomalyResult(R, [AnomalyPrediction(score=10)]))), + (2, (3, AnomalyResult(R, [AnomalyPrediction(score=20)]))), + (2, (4, AnomalyResult(R, [AnomalyPrediction(score=30)]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + R, [AnomalyPrediction(score=1, label=1, threshold=1)]))), + ( + 1, + ( + 3, + AnomalyResult( + R, [AnomalyPrediction(score=2, label=1, threshold=1.5)]))), + ( + 2, + ( + 2, + AnomalyResult( + R, [AnomalyPrediction(score=10, label=1, threshold=10)]))), + ( + 2, + ( + 3, + AnomalyResult( + R, [AnomalyPrediction(score=20, label=1, threshold=15)]))), + ( + 1, + ( + 4, + AnomalyResult( + R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), + ( + 2, + ( + 4, + AnomalyResult( + R, [AnomalyPrediction(score=30, label=1, threshold=20)]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + # use median just for test convenience + | beam.ParDo( + _StatefulThresholdDoFn( + QuantileThreshold( + quantile=0.5, normal_label=0, + outlier_label=1).to_spec()))) + + assert_that(result, equal_to(expected)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.WARNING) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py index b0eb2aba1e69..d44abaa82a26 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/base.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -19,6 +19,8 @@ from collections import deque from enum import Enum +EPSILON = 1e-9 + class BaseTracker(abc.ABC): """Abstract base class for all univariate trackers.""" diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mad.py b/sdks/python/apache_beam/ml/anomaly/univariate/mad.py new file mode 100644 index 000000000000..62e97b3f941b --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mad.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating median absolute deviation in windowed fashion.""" + +from typing import Optional + +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker +from apache_beam.ml.anomaly.univariate.median import MedianTracker + + +@specifiable +class MadTracker(BaseTracker): + """Tracks the Median Absolute Deviation (MAD) of a stream of values. + + This class calculates the MAD, a robust measure of statistical dispersion, in + an online setting. + + Similar functionality is available in the River library: + https://github.com/online-ml/river/blob/main/river/stats/mad.py + + Important: + This online version of MAD that does not exactly match its batch + counterpart. In a streaming data context, where the true median is initially + unknown, we employ an iterative estimation process. For each incoming data + point, we first update the estimated median, and then calculate the absolute + difference between the data point and this updated median. To maintain + computational efficiency, previously calculated absolute differences are not + recalculated with each subsequent median update. + + Args: + median_tracker: An optional `MedianTracker` instance for tracking the + median of the input values. If None, a default `MedianTracker` is + created. + diff_median_tracker: An optional `MedianTracker` instance for tracking + the median of the absolute deviations from the median. If None, a + default `MedianTracker` is created. + """ + def __init__( + self, + median_tracker: Optional[MedianTracker] = None, + diff_median_tracker: Optional[MedianTracker] = None): + self._median_tracker = median_tracker or MedianTracker() + self._diff_median_tracker = diff_median_tracker or MedianTracker() + + def push(self, x): + """Adds a new value to the tracker and updates the MAD. + + Args: + x: The value to be added to the tracked stream. + """ + self._median_tracker.push(x) + median = self._median_tracker.get() + self._diff_median_tracker.push(abs(x - median)) + + def get(self): + """Retrieves the current MAD value. + + Returns: + float: The MAD of the values within the defined window. Returns `NaN` if + the window is empty. + """ + return self._diff_median_tracker.get() + + def get_median(self): + """Retrieves the current median value. + + Returns: + float: The median of the values within the defined window. Returns `NaN` + if the window is empty. + """ + return self._median_tracker.get() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mad_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mad_test.py new file mode 100644 index 000000000000..2d0114af360a --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mad_test.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest + +from apache_beam.ml.anomaly.univariate.mad import MadTracker + + +class MadTest(unittest.TestCase): + def test_default_tracker(self): + t = MadTracker() + self.assertTrue(math.isnan(t.get())) + + t.push(4.0) # median=4, abs_dev=[0] + self.assertEqual(t.get(), 0) + + t.push(2.0) # median=3, abs_dev=[0, 1] + self.assertEqual(t.get(), 0.5) + + t.push(5.0) # median=4, abs_dev=[0, 1, 1] + self.assertEqual(t.get(), 1.0) + + t.push(3.0) # median=3.5 abs_dev=[0, 0.5, 1, 1] + self.assertEqual(t.get(), 0.75) + + t.push(0.0) # median=3 abs_dev=[0, 0.5, 1, 1, 3] + self.assertEqual(t.get(), 1.0) + + t.push(4.0) # median=3.5 abs_dev[0, 0.5, 0.5, 1, 1, 3] + self.assertEqual(t.get(), 0.75) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/median.py b/sdks/python/apache_beam/ml/anomaly/univariate/median.py new file mode 100644 index 000000000000..1d190b70e5b8 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/median.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating median in windowed fashion.""" + +from typing import Optional + +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker + +DEFAULT_WINDOW_SIZE = 1000 + + +@specifiable +class MedianTracker(BaseTracker): + """Tracks the median of a stream of values using a quantile tracker. + + This wrapper class encapsulates a `QuantileTracker` configured specifically + for the 0.5 quantile (median). + + Args: + quantile_tracker: An optional `QuantileTracker` instance. If not provided, + a `BufferedSlidingQuantileTracker` with a default window size 1000 and + q=0.5 is created. + + Raises: + AssertionError: If the provided quantile_tracker is not initialized with + q=0.5. + """ + def __init__(self, quantile_tracker: Optional[QuantileTracker] = None): + self._quantile_tracker = quantile_tracker or BufferedSlidingQuantileTracker( + DEFAULT_WINDOW_SIZE, 0.5) + assert self._quantile_tracker._q == 0.5, \ + "quantile_tracker must be initialized with q = 0.5" + + def push(self, x): + """Pushes a new value and updates the internal quantile tracker. + + Args: + x: The new value to be pushed. + """ + self._quantile_tracker.push(x) + + def get(self): + """Calculates and returns the median (q = 0.5). + + Returns: + float: The median of the values in the window setting specified in the + internal quantile tracker. Returns NaN if the window is empty. + """ + return self._quantile_tracker.get() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/median_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/median_test.py new file mode 100644 index 000000000000..d1ce660d4d28 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/median_test.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import unittest + +from apache_beam.ml.anomaly.univariate.median import MedianTracker +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long + + +class MedianTest(unittest.TestCase): + def test_default_tracker(self): + t = MedianTracker() + self.assertTrue(math.isnan(t.get())) + + t.push(1.0) + self.assertEqual(t.get(), 1.0) + + t.push(2.0) + self.assertEqual(t.get(), 1.5) + + t.push(1.0) + self.assertEqual(t.get(), 1.0) + + t.push(20.0) + self.assertEqual(t.get(), 1.5) + + t.push(10.0) + self.assertEqual(t.get(), 2.0) + + def test_custom_tracker(self): + t = MedianTracker(SimpleSlidingQuantileTracker(3, 0.5)) + self.assertTrue(math.isnan(t.get())) + + t.push(1.0) + self.assertEqual(t.get(), 1.0) + + t.push(2.0) + self.assertEqual(t.get(), 1.5) + + t.push(1.0) + self.assertEqual(t.get(), 1.0) + + t.push(20.0) + self.assertEqual(t.get(), 2.0) + + t.push(10.0) + self.assertEqual(t.get(), 10.0) + + def test_wrong_tracker(self): + t = MedianTracker(SimpleSlidingQuantileTracker(50, 0.1)) + self.assertRaises(AssertionError, lambda: t.run_original_init()) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index 44d2b1ab2448..ff3ee6061eee 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -116,6 +116,21 @@ def push(self, x): super().push(x) + @staticmethod + def _get_helper(sorted_items, q): + n = len(sorted_items) + if n < 1: + return float("nan") + + pos = q * (n - 1) + lo = math.floor(pos) + lo_value = typing.cast(float, sorted_items[lo]) + + # Use linear interpolation to yield the requested quantile + hi = min(lo + 1, n - 1) + hi_value: float = typing.cast(float, sorted_items[hi]) + return lo_value + (hi_value - lo_value) * (pos - lo) + def get(self): """Returns the current quantile value using the sorted list. @@ -124,18 +139,41 @@ def get(self): Returns: float: The calculated quantile value. Returns NaN if the window is empty. """ - n = len(self._sorted_items) - if n < 1: - return float("nan") + return self._get_helper(self._sorted_items, self._q) - pos = self._q * (n - 1) - lo = math.floor(pos) - lo_value = typing.cast(float, self._sorted_items[lo]) - # Use linear interpolation to yield the requested quantile - hi = min(lo + 1, n - 1) - hi_value: float = typing.cast(float, self._sorted_items[hi]) - return lo_value + (hi_value - lo_value) * (pos - lo) +@specifiable +class SecondaryBufferedQuantileTracker(WindowedTracker, QuantileTracker): + """A secondary quantile tracker that shares its data with a master tracker. + + This tracker acts as a read-only view of the master tracker's data, providing + quantile calculations without maintaining its own independent buffer. It + relies on the master's sorted items for quantile estimations. + + Args: + master: The BufferedQuantileTracker instance to share data with. + q: A list of quantiles to track. + """ + def __init__(self, master: QuantileTracker, q): + assert isinstance(master, BufferedQuantileTracker), \ + "Cannot create secondary tracker from non-BufferedQuantileTracker" + self._master = master + super().__init__(self._master._window_mode) + QuantileTracker.__init__(self, q) + self._sorted_items = self._master._sorted_items + + def push(self, x): + """Does nothing, as this is a secondary tracker. + """ + pass + + def get(self): + """Returns the calculated quantiles based on the master tracker's buffer. + + Returns: + A list of calculated quantiles. + """ + return self._master._get_helper(self._master._sorted_items, self._q) @specifiable