Skip to content

Commit

Permalink
Refactor span attribute setting with span.set_attributes (apache#43982
Browse files Browse the repository at this point in the history
)

Replaced multiple `span.set_attribute` calls with a single `span.set_attributes`!

This is not a huge change but still improves readability somewhat.
  • Loading branch information
kaxil authored Nov 13, 2024
1 parent 9564923 commit 046e57c
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 94 deletions.
42 changes: 31 additions & 11 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ def _run_processor_manager(
# to iterate the child processes
set_new_process_group()
span = Trace.get_current_span()
span.set_attribute("dag_directory", str(dag_directory))
span.set_attribute("dag_ids", str(dag_ids))
span.set_attributes(
{
"dag_directory": str(dag_directory),
"dag_ids": str(dag_ids),
}
)
setproctitle("airflow scheduler -- DagFileProcessorManager")
reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
Expand Down Expand Up @@ -1119,15 +1123,27 @@ def _collect_results_from_processor(self, processor, session: Session = NEW_SESS
span = Trace.get_tracer("DagFileProcessorManager").start_span(
"dag_processing", start_time=datetime_to_nano(processor.start_time)
)
span.set_attribute("file_path", processor.file_path)
span.set_attribute("run_count", self.get_run_count(processor.file_path) + 1)
span.set_attributes(
{
"file_path": processor.file_path,
"run_count": self.get_run_count(processor.file_path) + 1,
}
)

if processor.result is None:
span.set_attribute("error", True)
span.set_attribute("processor.exit_code", processor.exit_code)
span.set_attributes(
{
"error": True,
"processor.exit_code": processor.exit_code,
}
)
else:
span.set_attribute("num_dags", num_dags)
span.set_attribute("import_errors", count_import_errors)
span.set_attributes(
{
"num_dags": num_dags,
"import_errors": count_import_errors,
}
)
if count_import_errors > 0:
span.set_attribute("error", True)
import_errors = session.scalars(
Expand Down Expand Up @@ -1413,9 +1429,13 @@ def emit_metrics(self):
Stats.gauge(
"dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values())
)
span.set_attribute("total_parse_time", parse_time)
span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in self._file_stats.values()))
span.set_attribute("import_errors", sum(stat.import_errors for stat in self._file_stats.values()))
span.set_attributes(
{
"total_parse_time": parse_time,
"dag_bag_size": sum(stat.num_dags for stat in self._file_stats.values()),
"import_errors": sum(stat.import_errors for stat in self._file_stats.values()),
}
)

@property
def file_paths(self):
Expand Down
44 changes: 28 additions & 16 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,17 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
span_id=span_id,
links=links,
) as span:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number)
span.set_attribute("command", str(command))
span.set_attribute("queue", str(queue))
span.set_attribute("executor_config", str(executor_config))
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
"command": str(command),
"queue": str(queue),
"executor_config": str(executor_config),
}
)
del self.queued_tasks[key]
self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
self.running.add(key)
Expand Down Expand Up @@ -436,11 +440,15 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
) as span:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number)
span.set_attribute("error", True)
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
"error": True,
}
)

self.change_state(key, TaskInstanceState.FAILED, info)

Expand All @@ -459,10 +467,14 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
) as span:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number)
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
}
)

self.change_state(key, TaskInstanceState.SUCCESS, info)

Expand Down
14 changes: 9 additions & 5 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,15 @@ def execute_async(

span = Trace.get_current_span()
if span.is_recording():
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
"commands_to_run": str(command),
}
)

local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
self.executor.workers_used += 1
Expand Down
14 changes: 9 additions & 5 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ def execute_async(

span = Trace.get_current_span()
if span.is_recording():
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
"commands_to_run": str(self.commands_to_run),
}
)

def sync(self) -> None:
for key, command in self.commands_to_run:
Expand Down
94 changes: 57 additions & 37 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,28 +859,32 @@ def process_executor_events(

@classmethod
def _set_span_attrs__process_executor_events(cls, span, state, ti):
span.set_attribute("category", "scheduler")
span.set_attribute("task_id", ti.task_id)
span.set_attribute("dag_id", ti.dag_id)
span.set_attribute("state", ti.state)
if ti.state == TaskInstanceState.FAILED:
span.set_attribute("error", True)
span.set_attribute("start_date", str(ti.start_date))
span.set_attribute("end_date", str(ti.end_date))
span.set_attribute("duration", ti.duration)
span.set_attribute("executor_config", str(ti.executor_config))
span.set_attribute("execution_date", str(ti.execution_date))
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("pool", ti.pool)
span.set_attribute("queue", ti.queue)
span.set_attribute("priority_weight", ti.priority_weight)
span.set_attribute("queued_dttm", str(ti.queued_dttm))
span.set_attribute("queued_by_job_id", ti.queued_by_job_id)
span.set_attribute("pid", ti.pid)
# Use span.set_attributes
span.set_attributes(
{
"category": "scheduler",
"task_id": ti.task_id,
"dag_id": ti.dag_id,
"state": ti.state,
"error": True if state == TaskInstanceState.FAILED else False,
"start_date": str(ti.start_date),
"end_date": str(ti.end_date),
"duration": ti.duration,
"executor_config": str(ti.executor_config),
"execution_date": str(ti.execution_date),
"hostname": ti.hostname,
"log_url": ti.log_url,
"operator": str(ti.operator),
"try_number": ti.try_number,
"executor_state": state,
"pool": ti.pool,
"queue": ti.queue,
"priority_weight": ti.priority_weight,
"queued_dttm": str(ti.queued_dttm),
"queued_by_job_id": ti.queued_by_job_id,
"pid": ti.pid,
}
)
if span.is_recording():
span.add_event(name="queued", timestamp=datetime_to_nano(ti.queued_dttm))
span.add_event(name="started", timestamp=datetime_to_nano(ti.start_date))
Expand Down Expand Up @@ -1060,8 +1064,12 @@ def _run_scheduler_loop(self) -> None:
with Trace.start_span(
span_name="scheduler_job_loop", component="SchedulerJobRunner"
) as span, Stats.timer("scheduler.scheduler_loop_duration") as timer:
span.set_attribute("category", "scheduler")
span.set_attribute("loop_count", loop_count)
span.set_attributes(
{
"category": "scheduler",
"loop_count": loop_count,
}
)

if self.using_sqlite and self.processor_agent:
self.processor_agent.run_single_parsing_loop()
Expand Down Expand Up @@ -1518,10 +1526,14 @@ def _start_queued_dagruns(self, session: Session) -> None:
@add_span
def _update_state(dag: DAG, dag_run: DagRun):
span = Trace.get_current_span()
span.set_attribute("state", str(DagRunState.RUNNING))
span.set_attribute("run_id", dag_run.run_id)
span.set_attribute("type", dag_run.run_type)
span.set_attribute("dag_id", dag_run.dag_id)
span.set_attributes(
{
"state": str(DagRunState.RUNNING),
"run_id": dag_run.run_id,
"type": dag_run.run_type,
"dag_id": dag_run.dag_id,
}
)

dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
Expand Down Expand Up @@ -1631,9 +1643,13 @@ def _schedule_dag_run(
with Trace.start_span(
span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links
) as span:
span.set_attribute("dag_id", dag_run.dag_id)
span.set_attribute("run_id", dag_run.run_id)
span.set_attribute("run_type", dag_run.run_type)
span.set_attributes(
{
"dag_id": dag_run.dag_id,
"run_id": dag_run.run_id,
"run_type": dag_run.run_type,
}
)
callback: DagCallbackRequest | None = None

dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
Expand Down Expand Up @@ -1814,12 +1830,16 @@ def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name})
Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name})

span.set_attribute("category", "scheduler")
span.set_attribute(f"pool.open_slots.{pool_name}", slot_stats["open"])
span.set_attribute(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
span.set_attribute(f"pool.running_slots.{pool_name}", slot_stats["running"])
span.set_attribute(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"])
span.set_attribute(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"])
span.set_attributes(
{
"category": "scheduler",
f"pool.open_slots.{pool_name}": slot_stats["open"],
f"pool.queued_slots.{pool_name}": slot_stats["queued"],
f"pool.running_slots.{pool_name}": slot_stats["running"],
f"pool.deferred_slots.{pool_name}": slot_stats["deferred"],
f"pool.scheduled_slots.{pool_name}": slot_stats["scheduled"],
}
)

@provide_session
def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
Expand Down
10 changes: 7 additions & 3 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,13 @@ def emit_metrics(self):
Stats.gauge("triggerer.capacity_left", capacity_left, tags={"hostname": self.job.hostname})

span = Trace.get_current_span()
span.set_attribute("trigger host", self.job.hostname)
span.set_attribute("triggers running", len(self.trigger_runner.triggers))
span.set_attribute("capacity left", capacity_left)
span.set_attributes(
{
"trigger host": self.job.hostname,
"triggers running": len(self.trigger_runner.triggers),
"capacity left": capacity_left,
}
)


class TriggerDetails(TypedDict):
Expand Down
39 changes: 22 additions & 17 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1252,25 +1252,30 @@ def _handle_failure(
TaskInstance.save_to_db(failure_context["ti"], session)

with Trace.start_span_from_taskinstance(ti=task_instance) as span:
# ---- error info ----
span.set_attribute("error", "true")
span.set_attribute("error_msg", str(error))
span.set_attribute("context", context)
span.set_attribute("force_fail", force_fail)
# ---- common info ----
span.set_attribute("category", "DAG runs")
span.set_attribute("task_id", task_instance.task_id)
span.set_attribute("dag_id", task_instance.dag_id)
span.set_attribute("state", task_instance.state)
span.set_attribute("start_date", str(task_instance.start_date))
span.set_attribute("end_date", str(task_instance.end_date))
span.set_attribute("duration", task_instance.duration)
span.set_attribute("executor_config", str(task_instance.executor_config))
span.set_attribute("execution_date", str(task_instance.execution_date))
span.set_attribute("hostname", task_instance.hostname)
span.set_attributes(
{
# ---- error info ----
"error": "true",
"error_msg": str(error),
"context": context,
"force_fail": force_fail,
# ---- common info ----
"category": "DAG runs",
"task_id": task_instance.task_id,
"dag_id": task_instance.dag_id,
"state": task_instance.state,
"start_date": str(task_instance.start_date),
"end_date": str(task_instance.end_date),
"duration": task_instance.duration,
"executor_config": str(task_instance.executor_config),
"execution_date": str(task_instance.execution_date),
"hostname": task_instance.hostname,
"operator": str(task_instance.operator),
}
)

if isinstance(task_instance, TaskInstance):
span.set_attribute("log_url", task_instance.log_url)
span.set_attribute("operator", str(task_instance.operator))


def _refresh_from_task(
Expand Down

0 comments on commit 046e57c

Please sign in to comment.