From f18bf73fb686da51fc16730160709fdf92dec558 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 8 Mar 2025 11:39:29 -0500 Subject: [PATCH] Use uuid1() for unique key to fix failed tests on windows. --- .../apache_beam/ml/anomaly/transforms.py | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py index a7dbf35758b..5fcb2208d78 100644 --- a/sdks/python/apache_beam/ml/anomaly/transforms.py +++ b/sdks/python/apache_beam/ml/anomaly/transforms.py @@ -38,7 +38,6 @@ from apache_beam.ml.anomaly.specifiable import Specifiable from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState from apache_beam.transforms.userstate import ReadModifyWriteStateSpec -from apache_beam.utils import timestamp KeyT = TypeVar('KeyT') TempKeyT = TypeVar('TempKeyT', bound=int) @@ -491,10 +490,29 @@ def expand( input: beam.PCollection[InputT], ) -> beam.PCollection[OutputT]: - # add a temporary key per data point to facilitate grouping the outputs from - # multiple anomaly detectors for the same data point. + # Add a temporary unique key per data point to facilitate grouping the + # outputs from multiple anomaly detectors for the same data point. + # + # Unique key generation options: + # (1) Timestamp-based methods: https://docs.python.org/3/library/time.html + # (2) UUID module: https://docs.python.org/3/library/uuid.html + # + # Timestamp precision on Windows can lead to key collisions (see PEP 564: + # https://peps.python.org/pep-0564/#windows). Only time.perf_counter_ns() + # provides sufficient precision for our needs. + # + # Performance note: + # $ python -m timeit -n 100000 "import uuid; uuid.uuid1()" + # 100000 loops, best of 5: 806 nsec per loop + # $ python -m timeit -n 100000 "import uuid; uuid.uuid4()" + # 100000 loops, best of 5: 1.53 usec per loop + # $ python -m timeit -n 100000 "import time; time.perf_counter_ns()" + # 100000 loops, best of 5: 82.3 nsec per loop + # + # We select uuid.uuid1() for its inclusion of node information, making it + # more suitable for parallel execution environments. add_temp_key_fn: Callable[[InputT], KeyedInputT] \ - = lambda e: (e[0], (timestamp.Timestamp.now().micros, e[1])) + = lambda e: (e[0], (str(uuid.uuid1()), e[1])) keyed_input = (input | "Add temp key" >> beam.Map(add_temp_key_fn)) if isinstance(self._root_detector, EnsembleAnomalyDetector):