Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bundle name arg to list dags cli command #45779

Merged
merged 20 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
de267c9
Add bundle name arg to list dags cli command
ambika-garg Jan 18, 2025
030c22c
Add latest-bundle-version argument to list_dags command and display b…
ambika-garg Jan 22, 2025
8bf2db2
Fix dag lists command to read dags directly from db
ambika-garg Jan 30, 2025
d48587f
remove bundle version argument
ambika-garg Jan 30, 2025
65c7d7f
* Rename latest_bundle_version to bundle_version
ambika-garg Feb 4, 2025
aeb5027
Fix dag tests to support bundle-name
ambika-garg Feb 5, 2025
93f33dc
Apply suggestions from code review
ambika-garg Feb 7, 2025
d64d422
Update airflow/cli/commands/remote_commands/dag_command.py
ambika-garg Feb 7, 2025
59e4fea
Add latest-bundle-version argument to list_dags command and display b…
ambika-garg Jan 22, 2025
a9cab10
remove bundle version arguement
ambika-garg Jan 30, 2025
a4b6246
Extend list dags command to support multiple dag bundles
ambika-garg Feb 8, 2025
e34367e
Add the support for import errors in list_dags command
ambika-garg Feb 11, 2025
c2bcfab
Fix to fetch only the count of import errors
ambika-garg Feb 12, 2025
7069eeb
Fix test_cli_list_dags_prints_import_errors to correctly capture impo…
ambika-garg Feb 12, 2025
a0784d8
Fix test for list_import_errors command
ambika-garg Feb 19, 2025
437cc77
Delete previous import errors before adding new ones
ambika-garg Feb 22, 2025
37ab1a8
Add setup and teardown tasks
ambika-garg Feb 24, 2025
cf374f0
Update tests/cli/commands/remote_commands/test_dag_command.py
ambika-garg Feb 27, 2025
4527266
Update tests_common/pytest_plugin.py
ambika-garg Feb 27, 2025
a17c156
Fix indentation
ambika-garg Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Meta:

dag_id = auto_field(dump_only=True)
dag_display_name = fields.String(attribute="dag_display_name", dump_only=True)
bundle_name = auto_field(dump_only=True)
bundle_version = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
last_parsed_time = auto_field(dump_only=True)
Expand Down
5 changes: 3 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def string_lower_type(val):
"--bundle-name",
),
help=("The name of the DAG bundle to use; may be provided more than once"),
type=str,
default=None,
action="append",
)
Expand Down Expand Up @@ -880,7 +881,7 @@ def string_lower_type(val):
("--columns",),
type=string_list_type,
help="List of columns to render. (default: ['dag_id', 'fileloc', 'owner', 'is_paused'])",
default=("dag_id", "fileloc", "owners", "is_paused"),
default=("dag_id", "fileloc", "owners", "is_paused", "bundle_name", "bundle_version"),
)

ARG_ASSET_LIST_COLUMNS = Arg(
Expand Down Expand Up @@ -978,7 +979,7 @@ class GroupCommand(NamedTuple):
name="list",
help="List all the DAGs",
func=lazy_load_command("airflow.cli.commands.remote_commands.dag_command.dag_list_dags"),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS, ARG_BUNDLE_NAME),
),
ActionCommand(
name="list-import-errors",
Expand Down
38 changes: 32 additions & 6 deletions airflow/cli/commands/remote_commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Dag sub-commands."""

from __future__ import annotations
Expand All @@ -28,7 +29,7 @@
from typing import TYPE_CHECKING

import re2
from sqlalchemy import select
from sqlalchemy import func, select

from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
Expand All @@ -38,6 +39,7 @@
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils import cli as cli_utils, timezone
Expand Down Expand Up @@ -224,6 +226,8 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
return {
"dag_id": dag.dag_id,
"dag_display_name": dag.dag_display_name,
"bundle_name": dag.get_bundle_name(),
"bundle_version": dag.get_bundle_version(),
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
"last_parsed_time": None,
Expand Down Expand Up @@ -322,11 +326,12 @@ def print_execution_interval(interval: DataInterval | None):
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
def dag_list_dags(args, session=NEW_SESSION) -> None:
def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
"""Display dags with or without stats at the command line."""
cols = args.columns if args.columns else []
invalid_cols = [c for c in cols if c not in dag_schema.fields]
valid_cols = [c for c in cols if c in dag_schema.fields]

if invalid_cols:
from rich import print as rich_print

Expand All @@ -335,8 +340,18 @@ def dag_list_dags(args, session=NEW_SESSION) -> None:
f"List of valid columns: {list(dag_schema.fields.keys())}",
file=sys.stderr,
)
dagbag = DagBag(process_subdir(args.subdir))
if dagbag.import_errors:

dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()

# Get import errors from the DB
query = select(func.count()).select_from(ParseImportError)
if args.bundle_name:
query = query.where(ParseImportError.bundle_name.in_(args.bundle_name))

dagbag_import_errors = session.scalar(query)

if dagbag_import_errors > 0:
from rich import print as rich_print

rich_print(
Expand All @@ -353,8 +368,19 @@ def get_dag_detail(dag: DAG) -> dict:
dag_detail = _get_dagbag_dag_details(dag)
return {col: dag_detail[col] for col in valid_cols}

def filter_dags_by_bundle(dags: list[DAG], bundle_names: list[str] | None) -> list[DAG]:
"""Filter DAGs based on the specified bundle name, if provided."""
if not bundle_names:
return dags

validate_dag_bundle_arg(bundle_names)
return [dag for dag in dags if dag.get_bundle_name() in bundle_names]

AirflowConsole().print_as(
data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
data=sorted(
filter_dags_by_bundle(list(dagbag.dags.values()), args.bundle_name),
key=operator.attrgetter("dag_id"),
),
output=args.output,
mapper=get_dag_detail,
)
Expand All @@ -364,7 +390,7 @@ def get_dag_detail(dag: DAG) -> dict:
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
def dag_details(args, session=NEW_SESSION):
def dag_details(args, session: Session = NEW_SESSION):
"""Get DAG details given a DAG id."""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
Expand Down
34 changes: 25 additions & 9 deletions tests/cli/commands/remote_commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@

from tests.models import TEST_DAGS_FOLDER
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, clear_db_runs, parse_and_sync_to_db
from tests_common.test_utils.db import (
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
parse_and_sync_to_db,
)

DEFAULT_DATE = timezone.make_aware(datetime(2015, 1, 1), timezone=timezone.utc)
if pendulum.__version__.startswith("3"):
Expand All @@ -77,7 +82,11 @@ def teardown_class(cls) -> None:
clear_db_dags()

def setup_method(self):
clear_db_runs() # clean-up all dag run before start each test
clear_db_runs()
clear_db_import_errors()

def teardown_method(self):
clear_db_import_errors()

def test_show_dag_dependencies_print(self):
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
Expand Down Expand Up @@ -274,12 +283,17 @@ def test_cli_list_dags_invalid_cols(self):
assert "Ignoring the following invalid columns: ['invalid_col']" in out

@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_dags_prints_import_errors(self):
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_invalid_cron.py")
args = self.parser.parse_args(["dags", "list", "--output", "yaml", "--subdir", dag_path])
with contextlib.redirect_stderr(StringIO()) as temp_stderr:
dag_command.dag_list_dags(args)
out = temp_stderr.getvalue()
def test_cli_list_dags_prints_import_errors(self, configure_testing_dag_bundle, get_test_dag):
path_to_parse = TEST_DAGS_FOLDER / "test_invalid_cron.py"
get_test_dag("test_invalid_cron")

args = self.parser.parse_args(["dags", "list", "--output", "yaml", "--bundle-name", "testing"])

with configure_testing_dag_bundle(path_to_parse):
with contextlib.redirect_stderr(StringIO()) as temp_stderr:
dag_command.dag_list_dags(args)
out = temp_stderr.getvalue()

assert "Failed to load all files." in out

@conf_vars({("core", "load_examples"): "true"})
Expand All @@ -305,7 +319,9 @@ def test_dagbag_dag_col(self):
@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_import_errors(self):
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_invalid_cron.py")
args = self.parser.parse_args(["dags", "list", "--output", "yaml", "--subdir", dag_path])
args = self.parser.parse_args(
["dags", "list-import-errors", "--output", "yaml", "--subdir", dag_path]
)
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
with pytest.raises(SystemExit) as err_ctx:
dag_command.dag_list_import_errors(args)
Expand Down
19 changes: 19 additions & 0 deletions tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,25 @@ def _get(dag_id: str):
dagbag = DagBag(dag_folder=dag_file, include_examples=False)

dag = dagbag.get_dag(dag_id)

if dagbag.import_errors:
session = settings.Session()
from airflow.models.errors import ParseImportError
from airflow.utils import timezone

# Add the new import errors
for _filename, stacktrace in dagbag.import_errors.items():
session.add(
ParseImportError(
filename=str(dag_file),
bundle_name="testing",
timestamp=timezone.utcnow(),
stacktrace=stacktrace,
)
)

return

if AIRFLOW_V_3_0_PLUS:
session = settings.Session()
from airflow.models.dagbundle import DagBundleModel
Expand Down