Skip to content

Commit

Permalink
Use uuid1() for unique key.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Mar 8, 2025
1 parent 9ea4a96 commit 76d2728
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions sdks/python/apache_beam/ml/anomaly/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import dataclasses
import time
import typing
import uuid
from typing import Any
Expand Down Expand Up @@ -494,19 +493,27 @@ def expand(

# Add a temporary unique key per data point to facilitate grouping the
# outputs from multiple anomaly detectors for the same data point.
# Previously, timestamp.Timestamp.now().micros was used, but on Windows,
# its limited precision (around 10 milliseconds) resulted in key collisions.
# https://peps.python.org/pep-0564/#windows
# Performance note: time.monotonic_ns() is about 10x-20x faster than
# uuid.uuid4().
# $ python -m timeit -n 100000 "import time; time.monotonic_ns()"
# 100000 loops, best of 5: 86.7 nsec per loop
# $ python -m timeit -n 100000 "import uuid; str(uuid.uuid4())"
# 10000 loops, best of 5: 2.04 usec per loop
#
# Unique key generation options:
# (1) Timestamp-based methods: https://docs.python.org/3/library/time.html
# (2) UUID module: https://docs.python.org/3/library/uuid.html
#
# Timestamp precision on Windows can lead to key collisions (see PEP 564:
# https://peps.python.org/pep-0564/#windows). Only time.perf_counter_ns()
# provides sufficient precision for our needs.
#
# Performance note:
# $ python -m timeit -n 100000 "import uuid; uuid.uuid1()"
# 100000 loops, best of 5: 806 nsec per loop
# $ python -m timeit -n 100000 "import uuid; uuid.uuid4()"
# 100000 loops, best of 5: 1.53 usec per loop
# $ python -m timeit -n 100000 "import time; time.perf_counter_ns()"
# 100000 loops, best of 5: 85.2 nsec per loop
# 100000 loops, best of 5: 82.3 nsec per loop
#
# We select uuid.uuid1() for its inclusion of node information, making it
# more suitable for parallel execution environments.
add_temp_key_fn: Callable[[InputT], KeyedInputT] \
= lambda e: (e[0], (time.perf_counter_ns(), e[1]))
= lambda e: (e[0], (str(uuid.uuid1()), e[1]))
keyed_input = (input | "Add temp key" >> beam.Map(add_temp_key_fn))

if isinstance(self._root_detector, EnsembleAnomalyDetector):
Expand Down

0 comments on commit 76d2728

Please sign in to comment.