Skip to content

Commit 7f2b760

Browse files
authored
Merge branch 'main' into open-source-2
2 parents 19205ca + ee68ddf commit 7f2b760

File tree

40 files changed

+882
-571
lines changed

40 files changed

+882
-571
lines changed

airflow/api_fastapi/core_api/datamodels/dag_run.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@
1919

2020
from datetime import datetime
2121
from enum import Enum
22+
from typing import TYPE_CHECKING
2223

2324
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
2425

2526
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
2627
from airflow.models import DagRun
28+
from airflow.timetables.base import DataInterval
2729
from airflow.utils import timezone
2830
from airflow.utils.state import DagRunState
2931
from airflow.utils.types import DagRunTriggeredByType, DagRunType
3032

33+
if TYPE_CHECKING:
34+
from airflow.models import DAG
35+
3136

3237
class DAGRunPatchStates(str, Enum):
3338
"""Enum for DAG Run states when updating a DAG Run."""
@@ -99,6 +104,36 @@ def check_data_intervals(cls, values):
99104
)
100105
return values
101106

107+
def validate_context(self, dag: DAG) -> dict:
108+
coerced_logical_date = timezone.coerce_datetime(self.logical_date)
109+
run_after = self.run_after
110+
data_interval = None
111+
if coerced_logical_date:
112+
if self.data_interval_start and self.data_interval_end:
113+
data_interval = DataInterval(
114+
start=timezone.coerce_datetime(self.data_interval_start),
115+
end=timezone.coerce_datetime(self.data_interval_end),
116+
)
117+
else:
118+
data_interval = dag.timetable.infer_manual_data_interval(
119+
run_after=coerced_logical_date or timezone.coerce_datetime(self.run_after)
120+
)
121+
run_after = data_interval.end
122+
123+
run_id = self.dag_run_id or DagRun.generate_run_id(
124+
run_type=DagRunType.SCHEDULED,
125+
logical_date=coerced_logical_date,
126+
run_after=self.run_after,
127+
)
128+
return {
129+
"run_id": run_id,
130+
"logical_date": coerced_logical_date,
131+
"data_interval": data_interval,
132+
"run_after": run_after,
133+
"conf": self.conf,
134+
"note": self.note,
135+
}
136+
102137
@model_validator(mode="after")
103138
def validate_dag_run_id(self):
104139
if not self.dag_run_id:

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,34 @@ paths:
181181
summary: Get Dependencies
182182
description: Dependencies graph.
183183
operationId: get_dependencies
184+
parameters:
185+
- name: node_id
186+
in: query
187+
required: false
188+
schema:
189+
anyOf:
190+
- type: string
191+
- type: 'null'
192+
title: Node Id
184193
responses:
185194
'200':
186195
description: Successful Response
187196
content:
188197
application/json:
189198
schema:
190199
$ref: '#/components/schemas/BaseGraphResponse'
200+
'404':
201+
content:
202+
application/json:
203+
schema:
204+
$ref: '#/components/schemas/HTTPExceptionResponse'
205+
description: Not Found
206+
'422':
207+
description: Validation Error
208+
content:
209+
application/json:
210+
schema:
211+
$ref: '#/components/schemas/HTTPValidationError'
191212
/ui/dashboard/historical_metrics_data:
192213
get:
193214
tags:

airflow/api_fastapi/core_api/routes/public/dag_run.py

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
from typing import Annotated, Literal, cast
2121

22-
import pendulum
2322
from fastapi import Depends, HTTPException, Query, Request, status
2423
from fastapi.exceptions import RequestValidationError
2524
from pydantic import ValidationError
@@ -65,8 +64,6 @@
6564
from airflow.listeners.listener import get_listener_manager
6665
from airflow.models import DAG, DagModel, DagRun
6766
from airflow.models.dag_version import DagVersion
68-
from airflow.timetables.base import DataInterval
69-
from airflow.utils import timezone
7067
from airflow.utils.state import DagRunState
7168
from airflow.utils.types import DagRunTriggeredByType, DagRunType
7269

@@ -358,38 +355,16 @@ def trigger_dag_run(
358355
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
359356
)
360357

361-
logical_date = timezone.coerce_datetime(body.logical_date)
362-
coerced_logical_date = timezone.coerce_datetime(logical_date)
363-
run_after = timezone.coerce_datetime(body.run_after)
364-
365358
try:
366359
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
367-
data_interval = None
368-
if body.logical_date:
369-
if body.data_interval_start and body.data_interval_end:
370-
data_interval = DataInterval(
371-
start=pendulum.instance(body.data_interval_start),
372-
end=pendulum.instance(body.data_interval_end),
373-
)
374-
else:
375-
data_interval = dag.timetable.infer_manual_data_interval(
376-
run_after=coerced_logical_date or run_after
377-
)
378-
run_after = data_interval.end
379-
380-
if body.dag_run_id:
381-
run_id = body.dag_run_id
382-
else:
383-
run_id = DagRun.generate_run_id(
384-
run_type=DagRunType.SCHEDULED, logical_date=coerced_logical_date, run_after=run_after
385-
)
360+
params = body.validate_context(dag)
386361

387362
dag_run = dag.create_dagrun(
388-
run_id=run_id,
389-
logical_date=coerced_logical_date,
390-
data_interval=data_interval,
391-
run_after=run_after,
392-
conf=body.conf,
363+
run_id=params["run_id"],
364+
logical_date=params["logical_date"],
365+
data_interval=params["data_interval"],
366+
run_after=params["run_after"],
367+
conf=params["conf"],
393368
run_type=DagRunType.MANUAL,
394369
triggered_by=DagRunTriggeredByType.REST_API,
395370
external_trigger=True,

airflow/api_fastapi/core_api/routes/ui/dependencies.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,28 @@
1717

1818
from __future__ import annotations
1919

20+
from fastapi import status
21+
from fastapi.exceptions import HTTPException
22+
2023
from airflow.api_fastapi.common.db.common import SessionDep
2124
from airflow.api_fastapi.common.router import AirflowRouter
2225
from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
26+
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
27+
from airflow.api_fastapi.core_api.services.ui.dependencies import extract_single_connected_component
2328
from airflow.models.serialized_dag import SerializedDagModel
2429

2530
dependencies_router = AirflowRouter(tags=["Dependencies"])
2631

2732

2833
@dependencies_router.get(
2934
"/dependencies",
35+
responses=create_openapi_http_exception_doc(
36+
[
37+
status.HTTP_404_NOT_FOUND,
38+
]
39+
),
3040
)
31-
def get_dependencies(
32-
session: SessionDep,
33-
) -> BaseGraphResponse:
41+
def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGraphResponse:
3442
"""Dependencies graph."""
3543
nodes_dict: dict[str, dict] = {}
3644
edge_tuples: set[tuple[str, str]] = set()
@@ -69,4 +77,10 @@ def get_dependencies(
6977
"edges": edges,
7078
}
7179

80+
if node_id is not None:
81+
try:
82+
data = extract_single_connected_component(node_id, data["nodes"], data["edges"])
83+
except ValueError as e:
84+
raise HTTPException(404, str(e))
85+
7286
return BaseGraphResponse(**data)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from collections import defaultdict
21+
22+
23+
def _dfs_connected_components(
24+
temp: list[str], node_id: str, visited: dict[str, bool], adjacency_matrix: dict[str, list[str]]
25+
) -> list[str]:
26+
visited[node_id] = True
27+
28+
temp.append(node_id)
29+
30+
for adj_node_id in adjacency_matrix[node_id]:
31+
if not visited[adj_node_id]:
32+
temp = _dfs_connected_components(temp, adj_node_id, visited, adjacency_matrix)
33+
34+
return temp
35+
36+
37+
def extract_connected_components(adjacency_matrix: dict[str, list[str]]) -> list[list[str]]:
38+
"""Extract all connected components of a graph."""
39+
visited: dict[str, bool] = {node_id: False for node_id in adjacency_matrix}
40+
41+
connected_components: list[list[str]] = []
42+
43+
for node_id in adjacency_matrix:
44+
if visited[node_id] is False:
45+
temp: list[str] = []
46+
connected_components.append(_dfs_connected_components(temp, node_id, visited, adjacency_matrix))
47+
return connected_components
48+
49+
50+
def extract_single_connected_component(
51+
node_id: str, nodes: list[dict], edges: list[dict]
52+
) -> dict[str, list[dict]]:
53+
"""Find the connected component that contains the node with the id ``node_id``."""
54+
adjacency_matrix: dict[str, list[str]] = defaultdict(list)
55+
56+
for edge in edges:
57+
adjacency_matrix[edge["source_id"]].append(edge["target_id"])
58+
adjacency_matrix[edge["target_id"]].append(edge["source_id"])
59+
60+
connected_components = extract_connected_components(adjacency_matrix)
61+
62+
filtered_connected_components = [cc for cc in connected_components if node_id in cc]
63+
64+
if len(filtered_connected_components) != 1:
65+
raise ValueError(
66+
f"Unique connected component not found, got {filtered_connected_components} for connected components of node {node_id}, expected only 1 connected component."
67+
)
68+
69+
connected_component = filtered_connected_components[0]
70+
71+
nodes = [node for node in nodes if node["id"] in connected_component]
72+
edges = [
73+
edge
74+
for edge in edges
75+
if (edge["source_id"] in connected_component and edge["target_id"] in connected_component)
76+
]
77+
78+
return {"nodes": nodes, "edges": edges}

airflow/cli/cli_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1945,7 +1945,7 @@ class GroupCommand(NamedTuple):
19451945
),
19461946
ActionCommand(
19471947
name="dag-processor",
1948-
help="Start a standalone Dag Processor instance",
1948+
help="Start a dag processor instance",
19491949
func=lazy_load_command("airflow.cli.commands.local_commands.dag_processor_command.dag_processor"),
19501950
args=(
19511951
ARG_PID,

airflow/config_templates/config.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2669,6 +2669,16 @@ dag_processor:
26692669
type: integer
26702670
example: ~
26712671
default: "30"
2672+
bundle_refresh_check_interval:
2673+
description: |
2674+
How often the DAG processor should check if any DAG bundles are ready for a refresh, either by hitting
2675+
the bundles refresh_interval or because another DAG processor has seen a newer version of the bundle.
2676+
A low value means we check more frequently, and have a smaller window of time where DAG processors are
2677+
out of sync with each other, parsing different versions of the same bundle.
2678+
version_added: ~
2679+
type: integer
2680+
example: ~
2681+
default: "5"
26722682
fastapi:
26732683
description: Configuration for the Fastapi webserver.
26742684
options:

airflow/config_templates/unit_tests.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ scheduler_heartbeat_sec = 5
9090

9191
[dag_processor]
9292
parsing_processes = 2
93+
bundle_refresh_check_interval = 0
9394

9495
[triggerer]
9596
# Those values are set so that during unit tests things run faster than usual.

airflow/dag_processing/manager.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,12 @@ class DagFileProcessorManager(LoggingMixin):
201201
base_log_dir: str = attrs.field(factory=_config_get_factory("scheduler", "CHILD_PROCESS_LOG_DIRECTORY"))
202202
_latest_log_symlink_date: datetime = attrs.field(factory=datetime.today, init=False)
203203

204+
bundle_refresh_check_interval: int = attrs.field(
205+
factory=_config_int_factory("dag_processor", "bundle_refresh_check_interval")
206+
)
207+
_bundles_last_refreshed: float = attrs.field(default=0, init=False)
208+
"""Last time we checked if any bundles are ready to be refreshed"""
209+
204210
def register_exit_signals(self):
205211
"""Register signals that stop child processes."""
206212
signal.signal(signal.SIGINT, self._exit_gracefully)
@@ -447,6 +453,19 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
447453
"""Refresh DAG bundles, if required."""
448454
now = timezone.utcnow()
449455

456+
# we don't need to check if it's time to refresh every loop - that is way too often
457+
next_check = self._bundles_last_refreshed + self.bundle_refresh_check_interval
458+
now_seconds = time.monotonic()
459+
if now_seconds < next_check:
460+
self.log.debug(
461+
"Not time to check if DAG Bundles need refreshed yet - skipping. "
462+
"Next check in %.2f seconds",
463+
next_check - now_seconds,
464+
)
465+
return
466+
467+
self._bundles_last_refreshed = now_seconds
468+
450469
for bundle in self._dag_bundles:
451470
# TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
452471
# What if the cloning/refreshing took too long(longer than the dag processor timeout)

0 commit comments

Comments
 (0)