Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more filters and search to get_dags endpoint #42321

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ paths:
items:
type: string
title: Tags
- name: owners
in: query
required: false
schema:
type: array
items:
type: string
title: Owners
- name: dag_id_pattern
in: query
required: false
Expand All @@ -72,6 +80,14 @@ paths:
- type: string
- type: 'null'
title: Dag Id Pattern
- name: dag_display_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Display Name Pattern
- name: only_active
in: query
required: false
Expand Down
61 changes: 54 additions & 7 deletions airflow/api_fastapi/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING, Any, Generic, List, TypeVar, Union

from fastapi import Depends, HTTPException, Query
from sqlalchemy import or_
from sqlalchemy import case, or_
from typing_extensions import Annotated, Self

from airflow.models.dag import DagModel, DagTag
Expand Down Expand Up @@ -101,18 +101,41 @@ def __call__(self, only_active: bool = Query(default=True)) -> _OnlyActiveFilter
return self.set_value(only_active)


class _DagIdPatternSearch(BaseParam[Union[str, None]]):
"""Search on dag_id."""
class _SearchParam(BaseParam[Union[str, None]]):
"""Search on attribute."""

def __init__(self, attribute: ColumnElement) -> None:
super().__init__()
self.attribute: ColumnElement = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select
return select.where(DagModel.dag_id.ilike(f"%{self.value}"))
return select.where(self.attribute.ilike(f"%{self.value}"))


class _DagIdPatternSearch(_SearchParam):
"""Search on dag_id."""

def __init__(self) -> None:
super().__init__(DagModel.dag_id)

def __call__(self, dag_id_pattern: str | None = Query(default=None)) -> _DagIdPatternSearch:
return self.set_value(dag_id_pattern)


class _DagDisplayNamePatternSearch(_SearchParam):
"""Search on dag_display_name."""

def __init__(self) -> None:
super().__init__(DagModel.dag_display_name)

def __call__(
self, dag_display_name_pattern: str | None = Query(default=None)
) -> _DagDisplayNamePatternSearch:
return self.set_value(dag_display_name_pattern)


class SortParam(BaseParam[Union[str]]):
"""Order result by the attribute."""

Expand All @@ -131,10 +154,16 @@ def to_orm(self, select: Select) -> Select:
f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model",
)

column = getattr(DagModel, lstriped_orderby)

# MySQL does not support `nullslast`, and True/False ordering depends on the
# database implementation
nullscheck = case((column.isnot(None), 0), else_=1)
if self.value[0] == "-":
return select.order_by(getattr(DagModel, lstriped_orderby).desc())
return select.order_by(nullscheck, column.desc(), DagModel.dag_id)
else:
return select.order_by(getattr(DagModel, lstriped_orderby).asc())
return select.order_by(nullscheck, column.asc(), DagModel.dag_id)

def __call__(self, order_by: str = Query(default="dag_id")) -> SortParam:
return self.set_value(order_by)
Expand All @@ -144,7 +173,7 @@ class _TagsFilter(BaseParam[List[str]]):
"""Filter on tags."""

def to_orm(self, select: Select) -> Select:
if self.value is None:
if not self.value:
return select

conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value]
Expand All @@ -154,9 +183,27 @@ def __call__(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter
return self.set_value(tags)


class _OwnersFilter(BaseParam[List[str]]):
"""Filter on owners."""

def to_orm(self, select: Select) -> Select:
if not self.value:
return select

conditions = [DagModel.owners.ilike(f"%{owner}%") for owner in self.value]
return select.where(or_(*conditions))

def __call__(self, owners: list[str] = Query(default_factory=list)) -> _OwnersFilter:
return self.set_value(owners)


QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter())]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter())]
QueryPausedFilter = Annotated[_PausedFilter, Depends(_PausedFilter())]
QueryOnlyActiveFilter = Annotated[_OnlyActiveFilter, Depends(_OnlyActiveFilter())]
QueryDagIdPatternSearch = Annotated[_DagIdPatternSearch, Depends(_DagIdPatternSearch())]
QueryDagDisplayNamePatternSearch = Annotated[
_DagDisplayNamePatternSearch, Depends(_DagDisplayNamePatternSearch())
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter())]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter())]
10 changes: 8 additions & 2 deletions airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

from airflow.api_fastapi.db import apply_filters_to_select, get_session
from airflow.api_fastapi.parameters import (
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryLimit,
QueryOffset,
QueryOnlyActiveFilter,
QueryOwnersFilter,
QueryPausedFilter,
QueryTagsFilter,
SortParam,
Expand All @@ -45,16 +47,20 @@ async def get_dags(
limit: QueryLimit,
offset: QueryOffset,
tags: QueryTagsFilter,
owners: QueryOwnersFilter,
dag_id_pattern: QueryDagIdPatternSearch,
dag_display_name_pattern: QueryDagDisplayNamePatternSearch,
only_active: QueryOnlyActiveFilter,
paused: QueryPausedFilter,
order_by: Annotated[SortParam, Depends(SortParam(["dag_id"]))],
order_by: Annotated[SortParam, Depends(SortParam(["dag_id", "dag_display_name", "next_dagrun"]))],
session: Annotated[Session, Depends(get_session)],
) -> DAGCollectionResponse:
"""Get all DAGs."""
dags_query = select(DagModel)

dags_query = apply_filters_to_select(dags_query, [only_active, paused, dag_id_pattern, tags])
dags_query = apply_filters_to_select(
dags_query, [only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners]
)

# TODO: Re-enable when permissions are handled.
# readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user)
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sqlalchemy.sql.expression import ColumnOperators
from sqlalchemy.types import TypeEngine


log = logging.getLogger(__name__)


Expand Down
32 changes: 26 additions & 6 deletions tests/api_fastapi/views/public/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timezone

import pytest

Expand All @@ -28,11 +28,10 @@
pytestmark = pytest.mark.db_test

DAG1_ID = "test_dag1"
DAG1_DISPLAY_NAME = "a"
DAG1_DISPLAY_NAME = "display1"
DAG2_ID = "test_dag2"
DAG2_DISPLAY_NAME = "b"
DAG2_DISPLAY_NAME = "display2"
DAG3_ID = "test_dag3"
DAG3_DISPLAY_NAME = "c"
TASK_ID = "op1"


Expand All @@ -44,6 +43,8 @@ def _create_deactivated_paused_dag(session=None):
timetable_summary="2 2 * * *",
is_active=False,
is_paused=True,
owners="test_owner,another_test_owner",
next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(dag_model)

Expand All @@ -56,6 +57,7 @@ def setup() -> None:

with DAG(
DAG1_ID,
dag_display_name=DAG1_DISPLAY_NAME,
schedule=None,
start_date=datetime(2020, 6, 15),
doc_md="details",
Expand All @@ -64,7 +66,16 @@ def setup() -> None:
) as dag1:
EmptyOperator(task_id=TASK_ID)

with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2:
with DAG(
DAG2_ID,
dag_display_name=DAG2_DISPLAY_NAME,
schedule=None,
start_date=datetime(
2020,
6,
15,
),
) as dag2:
EmptyOperator(task_id=TASK_ID)

dag1.sync_to_db()
Expand All @@ -76,15 +87,24 @@ def setup() -> None:
@pytest.mark.parametrize(
"query_params, expected_total_entries, expected_ids",
[
# Filters
({}, 2, ["test_dag1", "test_dag2"]),
({"limit": 1}, 2, ["test_dag1"]),
({"offset": 1}, 2, ["test_dag2"]),
({"tags": ["example"]}, 1, ["test_dag1"]),
({"dag_id_pattern": "1"}, 1, ["test_dag1"]),
({"only_active": False}, 3, ["test_dag1", "test_dag2", "test_dag3"]),
({"paused": True, "only_active": False}, 1, ["test_dag3"]),
({"paused": False}, 2, ["test_dag1", "test_dag2"]),
({"owners": ["airflow"]}, 2, ["test_dag1", "test_dag2"]),
({"owners": ["test_owner"], "only_active": False}, 1, ["test_dag3"]),
# # Sort
({"order_by": "-dag_id"}, 2, ["test_dag2", "test_dag1"]),
({"order_by": "-dag_display_name"}, 2, ["test_dag2", "test_dag1"]),
({"order_by": "dag_display_name"}, 2, ["test_dag1", "test_dag2"]),
({"order_by": "next_dagrun", "only_active": False}, 3, ["test_dag3", "test_dag1", "test_dag2"]),
# Search
({"dag_id_pattern": "1"}, 1, ["test_dag1"]),
({"dag_display_name_pattern": "display2"}, 1, ["test_dag2"]),
],
)
def test_get_dags(test_client, query_params, expected_total_entries, expected_ids):
Expand Down