Skip to content

Commit

Permalink
Use uuid1() for unique key to fix failed tests on windows.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Mar 8, 2025
1 parent e6a7bb1 commit f18bf73
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions sdks/python/apache_beam/ml/anomaly/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from apache_beam.ml.anomaly.specifiable import Specifiable
from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.utils import timestamp

KeyT = TypeVar('KeyT')
TempKeyT = TypeVar('TempKeyT', bound=int)
Expand Down Expand Up @@ -491,10 +490,29 @@ def expand(
input: beam.PCollection[InputT],
) -> beam.PCollection[OutputT]:

# add a temporary key per data point to facilitate grouping the outputs from
# multiple anomaly detectors for the same data point.
# Add a temporary unique key per data point to facilitate grouping the
# outputs from multiple anomaly detectors for the same data point.
#
# Unique key generation options:
# (1) Timestamp-based methods: https://docs.python.org/3/library/time.html
# (2) UUID module: https://docs.python.org/3/library/uuid.html
#
# Timestamp precision on Windows can lead to key collisions (see PEP 564:
# https://peps.python.org/pep-0564/#windows). Only time.perf_counter_ns()
# provides sufficient precision for our needs.
#
# Performance note:
# $ python -m timeit -n 100000 "import uuid; uuid.uuid1()"
# 100000 loops, best of 5: 806 nsec per loop
# $ python -m timeit -n 100000 "import uuid; uuid.uuid4()"
# 100000 loops, best of 5: 1.53 usec per loop
# $ python -m timeit -n 100000 "import time; time.perf_counter_ns()"
# 100000 loops, best of 5: 82.3 nsec per loop
#
# We select uuid.uuid1() for its inclusion of node information, making it
# more suitable for parallel execution environments.
add_temp_key_fn: Callable[[InputT], KeyedInputT] \
= lambda e: (e[0], (timestamp.Timestamp.now().micros, e[1]))
= lambda e: (e[0], (str(uuid.uuid1()), e[1]))
keyed_input = (input | "Add temp key" >> beam.Map(add_temp_key_fn))

if isinstance(self._root_detector, EnsembleAnomalyDetector):
Expand Down

0 comments on commit f18bf73

Please sign in to comment.