Skip to content

Commit

Permalink
Refactor & rename metrics_consistency_on conf to `timer_unit_consis…
Browse files Browse the repository at this point in the history
…tency`

Changes:
- Replaces the `metrics_consistency_on` config with `timer_unit_consistency` for better clarity!
- Improves the newsfragment entry & deprecation warning
- Changes the default to be `False` so folks aren't caught by surprise.

We should backport this to 2.11 and remove this setting from Airflow main
  • Loading branch information
kaxil committed Nov 13, 2024
1 parent 981ecef commit f7cb045
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 55 deletions.
19 changes: 13 additions & 6 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1064,17 +1064,24 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
metrics_consistency_on:
description: |
Enables metrics consistency across all metrics loggers (ex: timer and timing metrics).
# TODO: Remove 'timer_unit_consistency' in Airflow 3.0
timer_unit_consistency:
description: |
Controls the consistency of timer units across all metrics loggers
(e.g., Statsd, Datadog, OpenTelemetry)
for timing and duration-based metrics. When enabled, all timers will publish
metrics in milliseconds for consistency and alignment with Airflow's default
metrics behavior in version 3.0+.
.. warning::
It is enabled by default from Airflow 3.
version_added: 2.10.0
It will be the default behavior from Airflow 3.0. If disabled, timers may publish
in seconds for backwards compatibility, though it is recommended to enable this
setting to ensure metric uniformity and forward-compat with Airflow 3.
version_added: 2.11.0
type: string
example: ~
default: "True"
default: "False"
statsd_on:
description: |
Enables sending metrics to StatsD.
Expand Down
8 changes: 4 additions & 4 deletions airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@

log = logging.getLogger(__name__)

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
Expand Down Expand Up @@ -144,7 +144,7 @@ def timing(
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
if metrics_consistency_on:
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
Expand Down
12 changes: 6 additions & 6 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -73,11 +73,11 @@
# Delimiter is placed between the universal metric prefix and the unique metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)

Expand Down Expand Up @@ -284,7 +284,7 @@ def timing(
"""OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
if metrics_consistency_on:
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
Expand Down
8 changes: 4 additions & 4 deletions airflow/metrics/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@

DeltaType = Union[int, float, datetime.timedelta]

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
Expand Down Expand Up @@ -127,7 +127,7 @@ def start(self) -> Timer:
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
if metrics_consistency_on:
if timer_unit_consistency:
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
else:
self.duration = time.perf_counter() - self._start_time
Expand Down
10 changes: 5 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@

PAST_DEPENDS_MET = "past_depends_met"

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
Expand Down Expand Up @@ -2827,7 +2827,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if metrics_consistency_on:
if timer_unit_consistency:
timing = timezone.utcnow() - self.queued_dttm
else:
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
Expand All @@ -2843,7 +2843,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if metrics_consistency_on:
if timer_unit_consistency:
timing = timezone.utcnow() - self.start_date
else:
timing = (timezone.utcnow() - self.start_date).total_seconds()
Expand Down
12 changes: 11 additions & 1 deletion newsfragments/39908.significant.rst
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled
Publishing timer and timing metrics in seconds is now deprecated.

In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section will be
enabled by default and setting itself will be removed. This will standardize all timer and timing metrics to
milliseconds across all metric loggers.

**Users Integrating with Datadog, OpenTelemetry, or other metric backends** should enable this setting. For users, using
``statsd``, this change will not affect you.

If you need backward compatibility, you can leave this setting disabled temporarily, but enabling
``timer_unit_consistency`` is encouraged to future-proof your metrics setup.
40 changes: 20 additions & 20 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,20 +236,20 @@ def test_gauge_value_is_correct(self, name):
assert self.map[full_name(name)].value == 1

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[True, False],
)
def test_timing_new_metric(self, metrics_consistency_on, name):
def test_timing_new_metric(self, timer_unit_consistency, name):
import datetime

otel_logger.metrics_consistency_on = metrics_consistency_on
otel_logger.timer_unit_consistency = timer_unit_consistency

self.stats.timing(name, dt=datetime.timedelta(seconds=123))

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
expected_value = 123000.0 if metrics_consistency_on else 123
expected_value = 123000.0 if timer_unit_consistency else 123
assert self.map[full_name(name)].value == expected_value

def test_timing_new_metric_with_tags(self, name):
Expand Down Expand Up @@ -277,80 +277,80 @@ def test_timing_existing_metric(self, name):
# to get the end timestamp. timer() should return the difference as a float.

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name):
protocols.metrics_consistency_on = metrics_consistency_on
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency
with self.stats.timer(name) as timer:
pass

assert isinstance(timer.duration, float)
expected_duration = 3140.0 if metrics_consistency_on else 3.14
expected_duration = 3140.0 if timer_unit_consistency else 3.14
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_no_name_returns_float_but_does_not_store_value(
self, mock_time, metrics_consistency_on, name
self, mock_time, timer_unit_consistency, name
):
protocols.metrics_consistency_on = metrics_consistency_on
protocols.timer_unit_consistency = timer_unit_consistency
with self.stats.timer() as timer:
pass

assert isinstance(timer.duration, float)
expected_duration = 3140.0 if metrics_consistency_on else 3.14
expected_duration = 3140.0 if timer_unit_consistency else 3.14
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[
True,
False,
],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_false(self, mock_time, metrics_consistency_on, name):
protocols.metrics_consistency_on = metrics_consistency_on
def test_timer_start_and_stop_manually_send_false(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency

timer = self.stats.timer(name)
timer.start()
# Perform some task
timer.stop(send=False)

assert isinstance(timer.duration, float)
expected_value = 3140.0 if metrics_consistency_on else 3.14
expected_value = 3140.0 if timer_unit_consistency else 3.14
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[
True,
False,
],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_true(self, mock_time, metrics_consistency_on, name):
protocols.metrics_consistency_on = metrics_consistency_on
def test_timer_start_and_stop_manually_send_true(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency
timer = self.stats.timer(name)
timer.start()
# Perform some task
timer.stop(send=True)

assert isinstance(timer.duration, float)
expected_value = 3140.0 if metrics_consistency_on else 3.14
expected_value = 3140.0 if timer_unit_consistency else 3.14
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
Expand Down
16 changes: 8 additions & 8 deletions tests/core/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,18 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self)
)

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
def test_timer(self, time_mock, metrics_consistency_on):
protocols.metrics_consistency_on = metrics_consistency_on
def test_timer(self, time_mock, timer_unit_consistency):
protocols.timer_unit_consistency = timer_unit_consistency

with self.dogstatsd.timer("empty_timer") as timer:
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[])
expected_duration = 100.0
if metrics_consistency_on:
if timer_unit_consistency:
expected_duration = 1000.0 * 100.0
assert expected_duration == timer.duration
assert time_mock.call_count == 2
Expand All @@ -244,20 +244,20 @@ def test_empty_timer(self):
self.dogstatsd_client.timed.assert_not_called()

@pytest.mark.parametrize(
"metrics_consistency_on",
"timer_unit_consistency",
[True, False],
)
def test_timing(self, metrics_consistency_on):
def test_timing(self, timer_unit_consistency):
import datetime

datadog_logger.metrics_consistency_on = metrics_consistency_on
datadog_logger.timer_unit_consistency = timer_unit_consistency

self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[])

self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
self.dogstatsd_client.timing.assert_called_with(
metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[]
metric="empty_timer", value=123000.0 if timer_unit_consistency else 123.0, tags=[]
)

def test_gauge(self):
Expand Down
2 changes: 1 addition & 1 deletion tests_common/_internals/forbidden_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def pytest_itemcollected(self, item: pytest.Item):
item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False)
item.add_marker(
pytest.mark.filterwarnings(
"ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning"
"ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning"
)
)

Expand Down

0 comments on commit f7cb045

Please sign in to comment.