Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84: Migrating GET Dataset events for DAG runs api to fastAPI #43874

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf5218c
AIP-84: Migrating GET Assets to fastAPI
amoghrajesh Nov 6, 2024
5a49280
matching response to legacy
amoghrajesh Nov 6, 2024
962572b
Adding unit tests - part 1
amoghrajesh Nov 8, 2024
428cb6c
Update airflow/api_fastapi/common/parameters.py
amoghrajesh Nov 8, 2024
a78d3cb
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
882d20c
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
25bb08e
Adding unit tests - part 2
amoghrajesh Nov 8, 2024
658479d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fa0cd23
fixing unit tests & updating parameter type
amoghrajesh Nov 8, 2024
dd791c2
review comments pierre
amoghrajesh Nov 8, 2024
06fa0a7
fixing last commit
amoghrajesh Nov 8, 2024
3bd803b
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fc29d7d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
7a97220
fixing unit tests
amoghrajesh Nov 9, 2024
df1ff8e
AIP-84: Migrating GET Dataset events for DAG runs to fastAPI
amoghrajesh Nov 11, 2024
6c80ced
adding test cases
amoghrajesh Nov 11, 2024
e344263
adding test cases
amoghrajesh Nov 11, 2024
0df9ebd
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 12, 2024
5fb8bc1
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 12, 2024
6426b0b
Merge branch 'AIP84-get-asset-to-fastapi' into AIP84-get-dataset-even…
amoghrajesh Nov 12, 2024
80c4da0
Merge branch 'main' into AIP84-get-dataset-events-dagrun
amoghrajesh Nov 13, 2024
e7b96aa
review comments pierre
amoghrajesh Nov 13, 2024
c4d7dfc
fixing unit tests
amoghrajesh Nov 13, 2024
e269dd3
Merge branch 'main' into AIP84-get-dataset-events-dagrun
amoghrajesh Nov 14, 2024
0da44d0
review comments pierre
amoghrajesh Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.utils import timezone
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.decorators import action_logging
Expand Down Expand Up @@ -77,6 +78,7 @@ def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
return asset_schema.dump(asset)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_dag_run(
raise BadRequest("DAGRunSchema error", detail=str(e))


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@security.requires_access_asset("GET")
@provide_session
Expand Down
39 changes: 39 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.models import Base, Connection
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand Down Expand Up @@ -408,6 +409,37 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter:
return self.set_value(dag_id)


class _UriPatternSearch(_SearchParam):
"""Search on uri."""

def __init__(self, skip_none: bool = True) -> None:
super().__init__(AssetModel.uri, skip_none)

def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch:
return self.set_value(uri_pattern)


class _DagIdAssetReferenceFilter(BaseParam[list[str]]):
"""Search on dag_id."""

def __init__(self, skip_none: bool = True) -> None:
super().__init__(AssetModel.consuming_dags, skip_none)

def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdAssetReferenceFilter:
# needed to handle cases where dag_ids=a1,b1
if dag_ids and len(dag_ids) == 1 and "," in dag_ids[0]:
dag_ids = dag_ids[0].split(",")
return self.set_value(dag_ids)

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(
(AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(self.value)))
| (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(self.value)))
)


class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""

Expand Down Expand Up @@ -493,8 +525,15 @@ def depends_float(

# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]

# TI
QueryTIStateFilter = Annotated[_TIStateFilter, Depends(_TIStateFilter().depends)]
QueryTIPoolFilter = Annotated[_TIPoolFilter, Depends(_TIPoolFilter().depends)]
QueryTIQueueFilter = Annotated[_TIQueueFilter, Depends(_TIQueueFilter().depends)]
QueryTIExecutorFilter = Annotated[_TIExecutorFilter, Depends(_TIExecutorFilter().depends)]

# Assets
QueryUriPatternSearch = Annotated[_UriPatternSearch, Depends(_UriPatternSearch().depends)]
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
103 changes: 103 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, Field


class DagScheduleAssetReference(BaseModel):
"""Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel."""

dag_id: str
created_at: datetime
updated_at: datetime


class TaskOutletAssetReference(BaseModel):
"""Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel."""

dag_id: str
task_id: str
created_at: datetime
updated_at: datetime


class AssetAliasSchema(BaseModel):
"""Serializable version of the AssetAliasSchema ORM SqlAlchemyModel."""

id: int
name: str


class AssetResponse(BaseModel):
"""Asset serializer for responses."""

id: int
uri: str
extra: dict | None = None
created_at: datetime
updated_at: datetime
consuming_dags: list[DagScheduleAssetReference]
producing_tasks: list[TaskOutletAssetReference]
aliases: list[AssetAliasSchema]


class AssetCollectionResponse(BaseModel):
"""Asset collection response."""

assets: list[AssetResponse]
total_entries: int


class DagRunAssetReference(BaseModel):
"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
Copy link
Member

@pierrejeambrun pierrejeambrun Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no model DagRunAssetReference, maybe you meant DagRun?

Maybe:

Suggested change
"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
"""Basic DagRun serializer for responses."""

Also I would remove everywhere the mention of ORM SqlAlchemyModel because the serializer can serialize much more than just an SQLAlchemy model. (any arbitrary object with appropriate attributes, a plain dictionnary, etc...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to:
"""Dag Run Asset Reference serializer for responses.""" and removed reference of ORM.
Does this make sense? Or I am open to any other docstring too


run_id: str
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
end_date: datetime | None = None
state: str
data_interval_start: datetime
data_interval_end: datetime


class AssetEventResponse(BaseModel):
"""Asset event serializer for responses."""

id: int
asset_id: int
# piggyback on the fix for https://github.com/apache/airflow/issues/43845 for asset_uri
# meanwhile, unblock by adding uri below
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
uri: str
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
source_run_id: str | None = None
source_map_index: int
created_dagruns: list[DagRunAssetReference]
timestamp: datetime


class AssetEventCollectionResponse(BaseModel):
"""Asset collection response."""
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved

asset_events: list[AssetEventResponse]
total_entries: int
Loading
Loading