Skip to content

Commit

Permalink
Refactor DagRun tracing into _trace_dagrun helper method (apache#44008
Browse files Browse the repository at this point in the history
)

related: apache#43789

Changes:
This commit
- extracts the tracing logic from the `update_state` method in `DagRun` to a new helper method `_trace_dagrun`.
- preserves types in some cases like: int and bool. Otel Attributes can be str, bool, int, float, Sequence[str], Sequence[bool], Sequence[int], Sequence[float]
  • Loading branch information
kaxil authored Nov 14, 2024
1 parent e7a0ddd commit 4af2c27
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,6 @@ def process_executor_events(

@classmethod
def _set_span_attrs__process_executor_events(cls, span, state, ti):
# Use span.set_attributes
span.set_attributes(
{
"category": "scheduler",
Expand Down
59 changes: 30 additions & 29 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,35 +1020,7 @@ def recalculate(self) -> _UnfinishedStates:
dagv.version if dagv else None,
)

with Trace.start_span_from_dagrun(dagrun=self) as span:
if self._state is DagRunState.FAILED:
span.set_attribute("error", True)
attributes = {
"category": "DAG runs",
"dag_id": str(self.dag_id),
"execution_date": str(self.execution_date),
"run_id": str(self.run_id),
"queued_at": str(self.queued_at),
"run_start_date": str(self.start_date),
"run_end_date": str(self.end_date),
"run_duration": str(
(self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else 0
),
"state": str(self._state),
"external_trigger": str(self.external_trigger),
"run_type": str(self.run_type),
"data_interval_start": str(self.data_interval_start),
"data_interval_end": str(self.data_interval_end),
"dag_version": str(dagv.version if dagv else None),
"conf": str(self.conf),
}
if span.is_recording():
span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at))
span.add_event(name="started", timestamp=datetime_to_nano(self.start_date))
span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date))
span.set_attributes(attributes)
self._trace_dagrun(dagv)

session.flush()

Expand All @@ -1060,6 +1032,35 @@ def recalculate(self) -> _UnfinishedStates:

return schedulable_tis, callback

def _trace_dagrun(self, dagv) -> None:
with Trace.start_span_from_dagrun(dagrun=self) as span:
if self._state == DagRunState.FAILED:
span.set_attribute("error", True)
attributes = {
"category": "DAG runs",
"dag_id": self.dag_id,
"execution_date": str(self.execution_date),
"run_id": self.run_id,
"queued_at": str(self.queued_at),
"run_start_date": str(self.start_date),
"run_end_date": str(self.end_date),
"run_duration": (self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else 0,
"state": str(self._state),
"external_trigger": self.external_trigger,
"run_type": str(self.run_type),
"data_interval_start": str(self.data_interval_start),
"data_interval_end": str(self.data_interval_end),
"dag_version": str(dagv.version if dagv else None),
"conf": str(self.conf),
}
if span.is_recording():
span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at))
span.add_event(name="started", timestamp=datetime_to_nano(self.start_date))
span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date))
span.set_attributes(attributes)

@provide_session
def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) -> TISchedulingDecision:
tis = self.get_task_instances(session=session, state=State.task_states)
Expand Down

0 comments on commit 4af2c27

Please sign in to comment.