From cf5218ccf98d694101ba1d37e9fe88ad83155e9e Mon Sep 17 00:00:00 2001 From: Amogh Date: Wed, 6 Nov 2024 15:22:45 +0530 Subject: [PATCH 01/18] AIP-84: Migrating GET Assets to fastAPI --- .../api_connexion/endpoints/asset_endpoint.py | 2 + airflow/api_fastapi/common/parameters.py | 15 ++++ .../core_api/openapi/v1-generated.yaml | 27 +++++++ .../core_api/routes/public/__init__.py | 2 + .../core_api/routes/public/assets.py | 72 +++++++++++++++++++ .../core_api/serializers/assets.py | 64 +++++++++++++++++ airflow/ui/openapi-gen/queries/common.ts | 16 +++++ airflow/ui/openapi-gen/queries/prefetch.ts | 19 +++++ airflow/ui/openapi-gen/queries/queries.ts | 25 +++++++ airflow/ui/openapi-gen/queries/suspense.ts | 25 +++++++ .../ui/openapi-gen/requests/services.gen.ts | 24 +++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 25 +++++++ 12 files changed, 316 insertions(+) create mode 100644 airflow/api_fastapi/core_api/routes/public/assets.py create mode 100644 airflow/api_fastapi/core_api/serializers/assets.py diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 1ea1db2b3bbb..085817213d0a 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -47,6 +47,7 @@ from airflow.assets.manager import asset_manager from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel from airflow.utils import timezone +from airflow.utils.api_migration import mark_fastapi_migration_done from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.decorators import action_logging @@ -77,6 +78,7 @@ def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse: return asset_schema.dump(asset) +@mark_fastapi_migration_done @security.requires_access_asset("GET") @format_parameters({"limit": check_limit}) @provide_session diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 64ae9406f08c..224f6e0b7e77 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -29,6 +29,7 @@ from typing_extensions import Annotated, Self from airflow.models import Base, Connection +from airflow.models.asset import AssetModel from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning, DagWarningType @@ -335,6 +336,17 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter: return self.set_value(dag_id) +class _UriPatternSearch(_SearchParam): + """Search on dag_id.""" + + def __init__(self, skip_none: bool = True) -> None: + super().__init__(AssetModel.uri, skip_none) + + def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: + uri_pattern = super().transform_aliases(uri_pattern) + return self.set_value(uri_pattern) + + # Common Safe DateTime DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] @@ -363,3 +375,6 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter: # DAGTags QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)] + +# Assets +QueryUriPatternSearch = Annotated[_UriPatternSearch, Depends(_UriPatternSearch().depends)] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 46a2be61d64a..4974d9540bc7 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2515,6 +2515,33 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/next_run_assets/{dag_id}: + get: + tags: + - Asset + summary: Next Run Assets + operationId: next_run_assets + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + type: object + title: Response Next Run Assets + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: AppBuilderMenuItemResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index b7c8affe4a9c..8359a8b67929 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -34,6 +34,7 @@ from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router from airflow.api_fastapi.core_api.routes.public.variables import variables_router from airflow.api_fastapi.core_api.routes.public.version import version_router +from airflow.api_fastapi.core_api.routes.ui import assets_router public_router = AirflowRouter(prefix="/public") @@ -56,3 +57,4 @@ public_router.include_router(variables_router) public_router.include_router(version_router) public_router.include_router(dag_stats_router) +public_router.include_router(assets_router) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py new file mode 100644 index 000000000000..a90741dfd447 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends +from sqlalchemy import select +from sqlalchemy.orm import Session + +from airflow.api_fastapi.common.db.common import get_session, paginated_select +from airflow.api_fastapi.common.parameters import ( + QueryDagIdsFilter, + QueryLimit, + QueryOffset, + QueryUriPatternSearch, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.serializers.assets import AssetCollectionResponse, AssetResponse +from airflow.assets import Asset +from airflow.models.asset import AssetModel + +assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") + + +@assets_router.get( + "/", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_assets( + limit: QueryLimit, + offset: QueryOffset, + uri_pattern: QueryUriPatternSearch, + dag_ids: QueryDagIdsFilter, + order_by: Annotated[ + SortParam, + Depends(SortParam(["id", "uri", "created_at", "updated_at"], Asset).dynamic_depends()), + ], + session: Annotated[Session, Depends(get_session)], +) -> AssetCollectionResponse: + """Get assets.""" + assets_select, total_entries = paginated_select( + select(AssetModel), + filters=[dag_ids, uri_pattern], + order_by=order_by, + offset=offset, + limit=limit, + session=session, + ) + + assets = session.scalars(assets_select).all() + return AssetCollectionResponse( + assets=[AssetResponse.model_validate(x, from_attributes=True) for x in assets], + total_entries=total_entries, + ) diff --git a/airflow/api_fastapi/core_api/serializers/assets.py b/airflow/api_fastapi/core_api/serializers/assets.py new file mode 100644 index 000000000000..7acb8cc6357d --- /dev/null +++ b/airflow/api_fastapi/core_api/serializers/assets.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class DagScheduleAssetReference(BaseModel): + """Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel.""" + + asset_id: int + dag_id: str + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class TaskOutletAssetReference(BaseModel): + """Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel.""" + + asset_id: int + dag_id: str + task_id: str + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + + +class AssetResponse(BaseModel): + """Asset serializer for responses.""" + + id: int + uri: str + extra: str | None = Field(default=None) + created_at: str + updated_at: str + consuming_dags: DagScheduleAssetReference | None = None + producing_tasks: TaskOutletAssetReference | None = None + + +class AssetCollectionResponse(BaseModel): + """Asset collection response.""" + + assets: list[AssetResponse] + total_entries: int diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index cec1f0f314dc..27b9d5c59a62 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -40,6 +40,22 @@ export const UseAssetServiceNextRunAssetsKeyFn = ( }, queryKey?: Array, ) => [useAssetServiceNextRunAssetsKey, ...(queryKey ?? [{ dagId }])]; +export type AssetServiceNextRunAssets1DefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceNextRunAssets1QueryResult< + TData = AssetServiceNextRunAssets1DefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceNextRunAssets1Key = "AssetServiceNextRunAssets1"; +export const UseAssetServiceNextRunAssets1KeyFn = ( + { + dagId, + }: { + dagId: string; + }, + queryKey?: Array, +) => [useAssetServiceNextRunAssets1Key, ...(queryKey ?? [{ dagId }])]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 04443427aca7..c1b00fa352a2 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -44,6 +44,25 @@ export const prefetchUseAssetServiceNextRunAssets = ( queryKey: Common.UseAssetServiceNextRunAssetsKeyFn({ dagId }), queryFn: () => AssetService.nextRunAssets({ dagId }), }); +/** + * Next Run Assets + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceNextRunAssets1 = ( + queryClient: QueryClient, + { + dagId, + }: { + dagId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }), + queryFn: () => AssetService.nextRunAssets1({ dagId }), + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 11dea6f3df58..c3057b19e599 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -65,6 +65,31 @@ export const useAssetServiceNextRunAssets = < queryFn: () => AssetService.nextRunAssets({ dagId }) as TData, ...options, }); +/** + * Next Run Assets + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useAssetServiceNextRunAssets1 = < + TData = Common.AssetServiceNextRunAssets1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }, queryKey), + queryFn: () => AssetService.nextRunAssets1({ dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index eed1a0afe805..033b4cdb513c 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -50,6 +50,31 @@ export const useAssetServiceNextRunAssetsSuspense = < queryFn: () => AssetService.nextRunAssets({ dagId }) as TData, ...options, }); +/** + * Next Run Assets + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useAssetServiceNextRunAssets1Suspense = < + TData = Common.AssetServiceNextRunAssets1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + }: { + dagId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }, queryKey), + queryFn: () => AssetService.nextRunAssets1({ dagId }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 4bfc986b56ba..41676411a89c 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -5,6 +5,8 @@ import { request as __request } from "./core/request"; import type { NextRunAssetsData, NextRunAssetsResponse, + NextRunAssets1Data, + NextRunAssets1Response, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -117,6 +119,28 @@ export class AssetService { }, }); } + + /** + * Next Run Assets + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ + public static nextRunAssets1( + data: NextRunAssets1Data, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/next_run_assets/{dag_id}", + path: { + dag_id: data.dagId, + }, + errors: { + 422: "Validation Error", + }, + }); + } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 06799783653b..3cb914e09d38 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -735,6 +735,14 @@ export type NextRunAssetsResponse = { [key: string]: unknown; }; +export type NextRunAssets1Data = { + dagId: string; +}; + +export type NextRunAssets1Response = { + [key: string]: unknown; +}; + export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1093,6 +1101,23 @@ export type $OpenApiTs = { }; }; }; + "/public/next_run_assets/{dag_id}": { + get: { + req: NextRunAssets1Data; + res: { + /** + * Successful Response + */ + 200: { + [key: string]: unknown; + }; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; From 5a49280ca9e5463219f5db2a3a29ef00162a9749 Mon Sep 17 00:00:00 2001 From: Amogh Date: Wed, 6 Nov 2024 19:00:47 +0530 Subject: [PATCH 02/18] matching response to legacy --- .../core_api/openapi/v1-generated.yaml | 188 +++++++++++++++++- .../core_api/routes/public/__init__.py | 2 +- .../core_api/routes/public/assets.py | 3 +- .../core_api/serializers/assets.py | 24 ++- airflow/ui/openapi-gen/queries/common.ts | 31 ++- airflow/ui/openapi-gen/queries/prefetch.ts | 38 +++- airflow/ui/openapi-gen/queries/queries.ts | 43 +++- airflow/ui/openapi-gen/queries/suspense.ts | 43 +++- .../ui/openapi-gen/requests/schemas.gen.ts | 157 +++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 34 +++- airflow/ui/openapi-gen/requests/types.gen.ts | 83 +++++++- 11 files changed, 563 insertions(+), 83 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 4974d9540bc7..473bc55b122d 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2515,27 +2515,76 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/next_run_assets/{dag_id}: + /public/assets/: get: tags: - Asset - summary: Next Run Assets - operationId: next_run_assets + summary: Get Assets + description: Get assets. + operationId: get_assets parameters: - - name: dag_id - in: path - required: true + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: uri_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Uri Pattern + - name: dag_ids + in: query + required: false + schema: + type: array + items: + type: string + title: Dag Ids + - name: order_by + in: query + required: false schema: type: string - title: Dag Id + default: id + title: Order By responses: '200': description: Successful Response content: application/json: schema: - type: object - title: Response Next Run Assets + $ref: '#/components/schemas/AssetCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found '422': description: Validation Error content: @@ -2591,6 +2640,83 @@ components: type: object title: AppBuilderViewResponse description: Serializer for AppBuilder View responses. + AssetAliasSchema: + properties: + id: + type: integer + title: Id + name: + type: string + title: Name + type: object + required: + - id + - name + title: AssetAliasSchema + description: Serializable version of the AssetAliasSchema ORM SqlAlchemyModel. + AssetCollectionResponse: + properties: + assets: + items: + $ref: '#/components/schemas/AssetResponse' + type: array + title: Assets + total_entries: + type: integer + title: Total Entries + type: object + required: + - assets + - total_entries + title: AssetCollectionResponse + description: Asset collection response. + AssetResponse: + properties: + id: + type: integer + title: Id + uri: + type: string + title: Uri + extra: + anyOf: + - type: object + - type: 'null' + title: Extra + created_at: + type: string + format: date-time + title: Created At + updated_at: + type: string + format: date-time + title: Updated At + consuming_dags: + items: + $ref: '#/components/schemas/DagScheduleAssetReference' + type: array + title: Consuming Dags + producing_tasks: + items: + $ref: '#/components/schemas/TaskOutletAssetReference' + type: array + title: Producing Tasks + aliases: + items: + $ref: '#/components/schemas/AssetAliasSchema' + type: array + title: Aliases + type: object + required: + - id + - uri + - created_at + - updated_at + - consuming_dags + - producing_tasks + - aliases + title: AssetResponse + description: Asset serializer for responses. BackfillPostBody: properties: dag_id: @@ -3547,6 +3673,26 @@ components: - asset_triggered title: DagRunType description: Class with DagRun types. + DagScheduleAssetReference: + properties: + dag_id: + type: string + title: Dag Id + created_at: + type: string + format: date-time + title: Created At + updated_at: + type: string + format: date-time + title: Updated At + type: object + required: + - dag_id + - created_at + - updated_at + title: DagScheduleAssetReference + description: Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel. DagStatsCollectionResponse: properties: dags: @@ -4303,6 +4449,30 @@ components: - triggerer_job title: TaskInstanceResponse description: TaskInstance serializer for responses. + TaskOutletAssetReference: + properties: + dag_id: + type: string + title: Dag Id + task_id: + type: string + title: Task Id + created_at: + type: string + format: date-time + title: Created At + updated_at: + type: string + format: date-time + title: Updated At + type: object + required: + - dag_id + - task_id + - created_at + - updated_at + title: TaskOutletAssetReference + description: Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel. TriggerResponse: properties: id: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index 8359a8b67929..134bab4fc519 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.routes.public.assets import assets_router from airflow.api_fastapi.core_api.routes.public.backfills import backfills_router from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router @@ -34,7 +35,6 @@ from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router from airflow.api_fastapi.core_api.routes.public.variables import variables_router from airflow.api_fastapi.core_api.routes.public.version import version_router -from airflow.api_fastapi.core_api.routes.ui import assets_router public_router = AirflowRouter(prefix="/public") diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index a90741dfd447..584fb39b6b2b 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -34,7 +34,6 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.serializers.assets import AssetCollectionResponse, AssetResponse -from airflow.assets import Asset from airflow.models.asset import AssetModel assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") @@ -51,7 +50,7 @@ async def get_assets( dag_ids: QueryDagIdsFilter, order_by: Annotated[ SortParam, - Depends(SortParam(["id", "uri", "created_at", "updated_at"], Asset).dynamic_depends()), + Depends(SortParam(["id", "uri", "created_at", "updated_at"], AssetModel).dynamic_depends()), ], session: Annotated[Session, Depends(get_session)], ) -> AssetCollectionResponse: diff --git a/airflow/api_fastapi/core_api/serializers/assets.py b/airflow/api_fastapi/core_api/serializers/assets.py index 7acb8cc6357d..498dc4edae6f 100644 --- a/airflow/api_fastapi/core_api/serializers/assets.py +++ b/airflow/api_fastapi/core_api/serializers/assets.py @@ -19,30 +19,31 @@ from datetime import datetime -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel class DagScheduleAssetReference(BaseModel): """Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel.""" - asset_id: int dag_id: str created_at: datetime updated_at: datetime - model_config = ConfigDict(from_attributes=True) - class TaskOutletAssetReference(BaseModel): """Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel.""" - asset_id: int dag_id: str task_id: str created_at: datetime updated_at: datetime - model_config = ConfigDict(from_attributes=True) + +class AssetAliasSchema(BaseModel): + """Serializable version of the AssetAliasSchema ORM SqlAlchemyModel.""" + + id: int + name: str class AssetResponse(BaseModel): @@ -50,11 +51,12 @@ class AssetResponse(BaseModel): id: int uri: str - extra: str | None = Field(default=None) - created_at: str - updated_at: str - consuming_dags: DagScheduleAssetReference | None = None - producing_tasks: TaskOutletAssetReference | None = None + extra: dict | None = None + created_at: datetime + updated_at: datetime + consuming_dags: list[DagScheduleAssetReference] + producing_tasks: list[TaskOutletAssetReference] + aliases: list[AssetAliasSchema] class AssetCollectionResponse(BaseModel): diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 27b9d5c59a62..9100c69a2938 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -40,22 +40,33 @@ export const UseAssetServiceNextRunAssetsKeyFn = ( }, queryKey?: Array, ) => [useAssetServiceNextRunAssetsKey, ...(queryKey ?? [{ dagId }])]; -export type AssetServiceNextRunAssets1DefaultResponse = Awaited< - ReturnType +export type AssetServiceGetAssetsDefaultResponse = Awaited< + ReturnType >; -export type AssetServiceNextRunAssets1QueryResult< - TData = AssetServiceNextRunAssets1DefaultResponse, +export type AssetServiceGetAssetsQueryResult< + TData = AssetServiceGetAssetsDefaultResponse, TError = unknown, > = UseQueryResult; -export const useAssetServiceNextRunAssets1Key = "AssetServiceNextRunAssets1"; -export const UseAssetServiceNextRunAssets1KeyFn = ( +export const useAssetServiceGetAssetsKey = "AssetServiceGetAssets"; +export const UseAssetServiceGetAssetsKeyFn = ( { - dagId, + dagIds, + limit, + offset, + orderBy, + uriPattern, }: { - dagId: string; - }, + dagIds?: string[]; + limit?: number; + offset?: number; + orderBy?: string; + uriPattern?: string; + } = {}, queryKey?: Array, -) => [useAssetServiceNextRunAssets1Key, ...(queryKey ?? [{ dagId }])]; +) => [ + useAssetServiceGetAssetsKey, + ...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]), +]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index c1b00fa352a2..7e5e9a12b77f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -45,23 +45,43 @@ export const prefetchUseAssetServiceNextRunAssets = ( queryFn: () => AssetService.nextRunAssets({ dagId }), }); /** - * Next Run Assets + * Get Assets + * Get assets. * @param data The data for the request. - * @param data.dagId - * @returns unknown Successful Response + * @param data.limit + * @param data.offset + * @param data.uriPattern + * @param data.dagIds + * @param data.orderBy + * @returns AssetCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseAssetServiceNextRunAssets1 = ( +export const prefetchUseAssetServiceGetAssets = ( queryClient: QueryClient, { - dagId, + dagIds, + limit, + offset, + orderBy, + uriPattern, }: { - dagId: string; - }, + dagIds?: string[]; + limit?: number; + offset?: number; + orderBy?: string; + uriPattern?: string; + } = {}, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }), - queryFn: () => AssetService.nextRunAssets1({ dagId }), + queryKey: Common.UseAssetServiceGetAssetsKeyFn({ + dagIds, + limit, + offset, + orderBy, + uriPattern, + }), + queryFn: () => + AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }), }); /** * Historical Metrics diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index c3057b19e599..12a61571dee4 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -66,28 +66,51 @@ export const useAssetServiceNextRunAssets = < ...options, }); /** - * Next Run Assets + * Get Assets + * Get assets. * @param data The data for the request. - * @param data.dagId - * @returns unknown Successful Response + * @param data.limit + * @param data.offset + * @param data.uriPattern + * @param data.dagIds + * @param data.orderBy + * @returns AssetCollectionResponse Successful Response * @throws ApiError */ -export const useAssetServiceNextRunAssets1 = < - TData = Common.AssetServiceNextRunAssets1DefaultResponse, +export const useAssetServiceGetAssets = < + TData = Common.AssetServiceGetAssetsDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], >( { - dagId, + dagIds, + limit, + offset, + orderBy, + uriPattern, }: { - dagId: string; - }, + dagIds?: string[]; + limit?: number; + offset?: number; + orderBy?: string; + uriPattern?: string; + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }, queryKey), - queryFn: () => AssetService.nextRunAssets1({ dagId }) as TData, + queryKey: Common.UseAssetServiceGetAssetsKeyFn( + { dagIds, limit, offset, orderBy, uriPattern }, + queryKey, + ), + queryFn: () => + AssetService.getAssets({ + dagIds, + limit, + offset, + orderBy, + uriPattern, + }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 033b4cdb513c..6cce83916431 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -51,28 +51,51 @@ export const useAssetServiceNextRunAssetsSuspense = < ...options, }); /** - * Next Run Assets + * Get Assets + * Get assets. * @param data The data for the request. - * @param data.dagId - * @returns unknown Successful Response + * @param data.limit + * @param data.offset + * @param data.uriPattern + * @param data.dagIds + * @param data.orderBy + * @returns AssetCollectionResponse Successful Response * @throws ApiError */ -export const useAssetServiceNextRunAssets1Suspense = < - TData = Common.AssetServiceNextRunAssets1DefaultResponse, +export const useAssetServiceGetAssetsSuspense = < + TData = Common.AssetServiceGetAssetsDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], >( { - dagId, + dagIds, + limit, + offset, + orderBy, + uriPattern, }: { - dagId: string; - }, + dagIds?: string[]; + limit?: number; + offset?: number; + orderBy?: string; + uriPattern?: string; + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseAssetServiceNextRunAssets1KeyFn({ dagId }, queryKey), - queryFn: () => AssetService.nextRunAssets1({ dagId }) as TData, + queryKey: Common.UseAssetServiceGetAssetsKeyFn( + { dagIds, limit, offset, orderBy, uriPattern }, + queryKey, + ), + queryFn: () => + AssetService.getAssets({ + dagIds, + limit, + offset, + orderBy, + uriPattern, + }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index d64abb3853b4..f30ea53d6608 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -89,6 +89,111 @@ export const $AppBuilderViewResponse = { description: "Serializer for AppBuilder View responses.", } as const; +export const $AssetAliasSchema = { + properties: { + id: { + type: "integer", + title: "Id", + }, + name: { + type: "string", + title: "Name", + }, + }, + type: "object", + required: ["id", "name"], + title: "AssetAliasSchema", + description: + "Serializable version of the AssetAliasSchema ORM SqlAlchemyModel.", +} as const; + +export const $AssetCollectionResponse = { + properties: { + assets: { + items: { + $ref: "#/components/schemas/AssetResponse", + }, + type: "array", + title: "Assets", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["assets", "total_entries"], + title: "AssetCollectionResponse", + description: "Asset collection response.", +} as const; + +export const $AssetResponse = { + properties: { + id: { + type: "integer", + title: "Id", + }, + uri: { + type: "string", + title: "Uri", + }, + extra: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Extra", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + updated_at: { + type: "string", + format: "date-time", + title: "Updated At", + }, + consuming_dags: { + items: { + $ref: "#/components/schemas/DagScheduleAssetReference", + }, + type: "array", + title: "Consuming Dags", + }, + producing_tasks: { + items: { + $ref: "#/components/schemas/TaskOutletAssetReference", + }, + type: "array", + title: "Producing Tasks", + }, + aliases: { + items: { + $ref: "#/components/schemas/AssetAliasSchema", + }, + type: "array", + title: "Aliases", + }, + }, + type: "object", + required: [ + "id", + "uri", + "created_at", + "updated_at", + "consuming_dags", + "producing_tasks", + "aliases", + ], + title: "AssetResponse", + description: "Asset serializer for responses.", +} as const; + export const $BackfillPostBody = { properties: { dag_id: { @@ -1580,6 +1685,30 @@ export const $DagRunType = { description: "Class with DagRun types.", } as const; +export const $DagScheduleAssetReference = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + updated_at: { + type: "string", + format: "date-time", + title: "Updated At", + }, + }, + type: "object", + required: ["dag_id", "created_at", "updated_at"], + title: "DagScheduleAssetReference", + description: + "Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel.", +} as const; + export const $DagStatsCollectionResponse = { properties: { dags: { @@ -2716,6 +2845,34 @@ export const $TaskInstanceResponse = { description: "TaskInstance serializer for responses.", } as const; +export const $TaskOutletAssetReference = { + properties: { + dag_id: { + type: "string", + title: "Dag Id", + }, + task_id: { + type: "string", + title: "Task Id", + }, + created_at: { + type: "string", + format: "date-time", + title: "Created At", + }, + updated_at: { + type: "string", + format: "date-time", + title: "Updated At", + }, + }, + type: "object", + required: ["dag_id", "task_id", "created_at", "updated_at"], + title: "TaskOutletAssetReference", + description: + "Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel.", +} as const; + export const $TriggerResponse = { properties: { id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 41676411a89c..f5f43d82060a 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -5,8 +5,8 @@ import { request as __request } from "./core/request"; import type { NextRunAssetsData, NextRunAssetsResponse, - NextRunAssets1Data, - NextRunAssets1Response, + GetAssetsData, + GetAssetsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -121,22 +121,34 @@ export class AssetService { } /** - * Next Run Assets + * Get Assets + * Get assets. * @param data The data for the request. - * @param data.dagId - * @returns unknown Successful Response + * @param data.limit + * @param data.offset + * @param data.uriPattern + * @param data.dagIds + * @param data.orderBy + * @returns AssetCollectionResponse Successful Response * @throws ApiError */ - public static nextRunAssets1( - data: NextRunAssets1Data, - ): CancelablePromise { + public static getAssets( + data: GetAssetsData = {}, + ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/next_run_assets/{dag_id}", - path: { - dag_id: data.dagId, + url: "/public/assets/", + query: { + limit: data.limit, + offset: data.offset, + uri_pattern: data.uriPattern, + dag_ids: data.dagIds, + order_by: data.orderBy, }, errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", 422: "Validation Error", }, }); diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 3cb914e09d38..71e313043f14 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -21,6 +21,38 @@ export type AppBuilderViewResponse = { [key: string]: unknown; }; +/** + * Serializable version of the AssetAliasSchema ORM SqlAlchemyModel. + */ +export type AssetAliasSchema = { + id: number; + name: string; +}; + +/** + * Asset collection response. + */ +export type AssetCollectionResponse = { + assets: Array; + total_entries: number; +}; + +/** + * Asset serializer for responses. + */ +export type AssetResponse = { + id: number; + uri: string; + extra?: { + [key: string]: unknown; + } | null; + created_at: string; + updated_at: string; + consuming_dags: Array; + producing_tasks: Array; + aliases: Array; +}; + /** * Object used for create backfill request. */ @@ -347,6 +379,15 @@ export type DagRunType = | "manual" | "asset_triggered"; +/** + * Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel. + */ +export type DagScheduleAssetReference = { + dag_id: string; + created_at: string; + updated_at: string; +}; + /** * DAG Stats Collection serializer for responses. */ @@ -629,6 +670,16 @@ export type TaskInstanceResponse = { triggerer_job: JobResponse | null; }; +/** + * Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel. + */ +export type TaskOutletAssetReference = { + dag_id: string; + task_id: string; + created_at: string; + updated_at: string; +}; + /** * Trigger serializer for responses. */ @@ -735,13 +786,15 @@ export type NextRunAssetsResponse = { [key: string]: unknown; }; -export type NextRunAssets1Data = { - dagId: string; +export type GetAssetsData = { + dagIds?: Array; + limit?: number; + offset?: number; + orderBy?: string; + uriPattern?: string | null; }; -export type NextRunAssets1Response = { - [key: string]: unknown; -}; +export type GetAssetsResponse = AssetCollectionResponse; export type HistoricalMetricsData = { endDate: string; @@ -1101,16 +1154,26 @@ export type $OpenApiTs = { }; }; }; - "/public/next_run_assets/{dag_id}": { + "/public/assets/": { get: { - req: NextRunAssets1Data; + req: GetAssetsData; res: { /** * Successful Response */ - 200: { - [key: string]: unknown; - }; + 200: AssetCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; /** * Validation Error */ From 962572b5a08c17b8160f9a865895b4a2dbcd33c0 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 10:59:04 +0530 Subject: [PATCH 03/18] Adding unit tests - part 1 --- airflow/api_fastapi/common/parameters.py | 3 +- .../core_api/routes/public/assets.py | 2 +- .../core_api/routes/public/test_assets.py | 190 ++++++++++++++++++ 3 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 tests/api_fastapi/core_api/routes/public/test_assets.py diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 224f6e0b7e77..c98b8540e1b7 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -135,7 +135,7 @@ def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None: def to_orm(self, select: Select) -> Select: if self.value is None and self.skip_none: return select - return select.where(self.attribute.ilike(f"%{self.value}")) + return select.where(self.attribute.ilike(f"%{self.value}%")) def transform_aliases(self, value: str | None) -> str | None: if value == "~": @@ -343,7 +343,6 @@ def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.uri, skip_none) def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: - uri_pattern = super().transform_aliases(uri_pattern) return self.set_value(uri_pattern) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 584fb39b6b2b..961672a33384 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -57,7 +57,7 @@ async def get_assets( """Get assets.""" assets_select, total_entries = paginated_select( select(AssetModel), - filters=[dag_ids, uri_pattern], + filters=[uri_pattern, dag_ids], order_by=order_by, offset=offset, limit=limit, diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py new file mode 100644 index 000000000000..04e2755412d3 --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -0,0 +1,190 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.models.asset import AssetModel +from airflow.utils import timezone +from airflow.utils.session import provide_session + +from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_assets + +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] + + +def _create_assets(session, num: int = 2) -> None: + default_time = "2020-06-11T18:00:00+00:00" + assets = [ + AssetModel( + id=i, + uri=f"s3://bucket/key/{i}", + extra={"foo": "bar"}, + created_at=timezone.parse(default_time), + updated_at=timezone.parse(default_time), + ) + for i in range(1, 1 + num) + ] + session.add_all(assets) + session.commit() + + +def _create_provided_asset(asset: AssetModel, session) -> None: + session.add(asset) + session.commit() + + +class TestAssets: + default_time = "2020-06-11T18:00:00+00:00" + + @pytest.fixture(autouse=True) + def setup(self) -> None: + clear_db_assets() + + def teardown_method(self) -> None: + clear_db_assets() + + @provide_session + def create_assets(self, session, num: int = 2): + _create_assets(session, num=num) + + @provide_session + def create_provided_asset(self, session, asset: AssetModel): + _create_provided_asset(session, asset) + + +class TestGetAssets(TestAssets): + def test_should_respond_200(self, test_client, session): + self.create_assets() + assets = session.query(AssetModel).all() + assert len(assets) == 2 + + response = test_client.get("/public/assets") + assert response.status_code == 200 + response_data = response.json() + tz_datetime_format = self.default_time.replace("+00:00", "Z") + assert response_data == { + "assets": [ + { + "id": 1, + "uri": "s3://bucket/key/1", + "extra": {"foo": "bar"}, + "created_at": tz_datetime_format, + "updated_at": tz_datetime_format, + "consuming_dags": [], + "producing_tasks": [], + "aliases": [], + }, + { + "id": 2, + "uri": "s3://bucket/key/2", + "extra": {"foo": "bar"}, + "created_at": tz_datetime_format, + "updated_at": tz_datetime_format, + "consuming_dags": [], + "producing_tasks": [], + "aliases": [], + }, + ], + "total_entries": 2, + } + + def test_order_by_raises_400_for_invalid_attr(self, test_client, session): + response = test_client.get("/public/assets?order_by=fake") + + assert response.status_code == 400 + msg = "Ordering with 'fake' is disallowed or the attribute does not exist on the model" + assert response.json()["detail"] == msg + + @pytest.mark.parametrize( + "url, expected_assets", + [ + ("/public/assets?uri_pattern=s3", {"s3://folder/key"}), + ("/public/assets?uri_pattern=bucket", {"gcp://bucket/key", "wasb://some_asset_bucket_/key"}), + ( + "/public/assets?uri_pattern=asset", + {"somescheme://asset/key", "wasb://some_asset_bucket_/key"}, + ), + ( + "/public/assets?uri_pattern=", + { + "gcp://bucket/key", + "s3://folder/key", + "somescheme://asset/key", + "wasb://some_asset_bucket_/key", + }, + ), + ], + ) + @provide_session + def test_filter_assets_by_uri_pattern_works(self, test_client, url, expected_assets, session): + asset1 = AssetModel("s3://folder/key") + asset2 = AssetModel("gcp://bucket/key") + asset3 = AssetModel("somescheme://asset/key") + asset4 = AssetModel("wasb://some_asset_bucket_/key") + + assets = [asset1, asset2, asset3, asset4] + for a in assets: + self.create_provided_asset(a) + + response = test_client.get(url) + assert response.status_code == 200 + asset_urls = {asset["uri"] for asset in response.json()["assets"]} + assert expected_assets == asset_urls + + +class TestGetAssetsEndpointPagination(TestAssets): + @pytest.mark.parametrize( + "url, expected_asset_uris", + [ + # Limit test data + ("/public/assets?limit=1", ["s3://bucket/key/1"]), + ("/public/assets?limit=100", [f"s3://bucket/key/{i}" for i in range(1, 101)]), + # Offset test data + ("/public/assets?offset=1", [f"s3://bucket/key/{i}" for i in range(2, 102)]), + ("/public/assets?offset=3", [f"s3://bucket/key/{i}" for i in range(4, 104)]), + # Limit and offset test data + ("/public/assets?offset=3&limit=3", [f"s3://bucket/key/{i}" for i in [4, 5, 6]]), + ], + ) + def test_limit_and_offset(self, test_client, url, expected_asset_uris): + self.create_assets(110) + + response = test_client.get(url) + + assert response.status_code == 200 + asset_uris = [asset["uri"] for asset in response.json()["assets"]] + assert asset_uris == expected_asset_uris + + def test_should_respect_page_size_limit_default(self, test_client): + self.create_assets(110) + + response = test_client.get("/public/assets") + + assert response.status_code == 200 + assert len(response.json()["assets"]) == 100 + + @conf_vars({("api", "maximum_page_limit"): "150"}) + def test_should_return_conf_max_if_req_max_above_conf(self, test_client): + self.create_assets(200) + + # change to 180 once format_parameters is integrated + response = test_client.get("/public/assets?limit=150") + + assert response.status_code == 200 + assert len(response.json()["assets"]) == 150 From 428cb6c6bd9084370e1e244092c1de326d291d8c Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 8 Nov 2024 11:03:12 +0530 Subject: [PATCH 04/18] Update airflow/api_fastapi/common/parameters.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/api_fastapi/common/parameters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index c98b8540e1b7..8c32f114172f 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -337,7 +337,7 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter: class _UriPatternSearch(_SearchParam): - """Search on dag_id.""" + """Search on uri.""" def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.uri, skip_none) From a78d3cb9209d160f53b304e989c412b0a324d28a Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 11:55:18 +0530 Subject: [PATCH 05/18] fixing the dag_ids filter --- airflow/api_fastapi/common/parameters.py | 27 ++++++++++++++++++- .../core_api/openapi/v1-generated.yaml | 6 ++--- .../core_api/routes/public/assets.py | 4 +-- airflow/ui/openapi-gen/queries/common.ts | 4 +-- airflow/ui/openapi-gen/queries/prefetch.ts | 6 ++--- airflow/ui/openapi-gen/queries/queries.ts | 6 ++--- airflow/ui/openapi-gen/queries/suspense.ts | 6 ++--- .../ui/openapi-gen/requests/services.gen.ts | 4 +-- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 9 files changed, 44 insertions(+), 21 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 8c32f114172f..8ac1803f9173 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -29,7 +29,7 @@ from typing_extensions import Annotated, Self from airflow.models import Base, Connection -from airflow.models.asset import AssetModel +from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning, DagWarningType @@ -346,6 +346,28 @@ def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: return self.set_value(uri_pattern) +class _DagIdAssetReferencePatternSearch(_SearchParam): + """Search on dag_id.""" + + def __init__(self, skip_none: bool = True) -> None: + super().__init__(AssetModel.consuming_dags, skip_none) + self.task_attribute = AssetModel.consuming_dags + + def depends(self, dag_ids: str) -> _DagIdAssetReferencePatternSearch: + return self.set_value(dag_ids) + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + if self.value is not None: + dags_list = self.value.split(",") + print("dags list", dags_list) + return select.filter( + (self.attribute.any(DagScheduleAssetReference.dag_id.in_(dags_list))) + | (self.task_attribute.any(TaskOutletAssetReference.dag_id.in_(dags_list))) + ) + + # Common Safe DateTime DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)] @@ -377,3 +399,6 @@ def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: # Assets QueryUriPatternSearch = Annotated[_UriPatternSearch, Depends(_UriPatternSearch().depends)] +QueryAssetDagIdPatternSearch = Annotated[ + _DagIdAssetReferencePatternSearch, Depends(_DagIdAssetReferencePatternSearch().depends) +] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 473bc55b122d..ab1369a4373c 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2547,11 +2547,9 @@ paths: title: Uri Pattern - name: dag_ids in: query - required: false + required: true schema: - type: array - items: - type: string + type: string title: Dag Ids - name: order_by in: query diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 961672a33384..8597dc40d66a 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -25,7 +25,7 @@ from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( - QueryDagIdsFilter, + QueryAssetDagIdPatternSearch, QueryLimit, QueryOffset, QueryUriPatternSearch, @@ -47,7 +47,7 @@ async def get_assets( limit: QueryLimit, offset: QueryOffset, uri_pattern: QueryUriPatternSearch, - dag_ids: QueryDagIdsFilter, + dag_ids: QueryAssetDagIdPatternSearch, order_by: Annotated[ SortParam, Depends(SortParam(["id", "uri", "created_at", "updated_at"], AssetModel).dynamic_depends()), diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 9100c69a2938..3cf15c9aa42c 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -56,12 +56,12 @@ export const UseAssetServiceGetAssetsKeyFn = ( orderBy, uriPattern, }: { - dagIds?: string[]; + dagIds: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - } = {}, + }, queryKey?: Array, ) => [ useAssetServiceGetAssetsKey, diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 7e5e9a12b77f..acef4e764b06 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -48,10 +48,10 @@ export const prefetchUseAssetServiceNextRunAssets = ( * Get Assets * Get assets. * @param data The data for the request. + * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern - * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -65,12 +65,12 @@ export const prefetchUseAssetServiceGetAssets = ( orderBy, uriPattern, }: { - dagIds?: string[]; + dagIds: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - } = {}, + }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn({ diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 12a61571dee4..6924213faab4 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -69,10 +69,10 @@ export const useAssetServiceNextRunAssets = < * Get Assets * Get assets. * @param data The data for the request. + * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern - * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -89,12 +89,12 @@ export const useAssetServiceGetAssets = < orderBy, uriPattern, }: { - dagIds?: string[]; + dagIds: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - } = {}, + }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 6cce83916431..5672b2dcb294 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -54,10 +54,10 @@ export const useAssetServiceNextRunAssetsSuspense = < * Get Assets * Get assets. * @param data The data for the request. + * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern - * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -74,12 +74,12 @@ export const useAssetServiceGetAssetsSuspense = < orderBy, uriPattern, }: { - dagIds?: string[]; + dagIds: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - } = {}, + }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index f5f43d82060a..bd0c3466fc78 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -124,16 +124,16 @@ export class AssetService { * Get Assets * Get assets. * @param data The data for the request. + * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern - * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError */ public static getAssets( - data: GetAssetsData = {}, + data: GetAssetsData, ): CancelablePromise { return __request(OpenAPI, { method: "GET", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 71e313043f14..a3e35c808288 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -787,7 +787,7 @@ export type NextRunAssetsResponse = { }; export type GetAssetsData = { - dagIds?: Array; + dagIds: string; limit?: number; offset?: number; orderBy?: string; From 882d20cc7e7d488e736ca5159026cbd3e5860d73 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 12:16:21 +0530 Subject: [PATCH 06/18] fixing the dag_ids filter --- airflow/api_fastapi/common/parameters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 8ac1803f9173..03581f689f88 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -351,7 +351,7 @@ class _DagIdAssetReferencePatternSearch(_SearchParam): def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.consuming_dags, skip_none) - self.task_attribute = AssetModel.consuming_dags + self.task_attribute = AssetModel.producing_tasks def depends(self, dag_ids: str) -> _DagIdAssetReferencePatternSearch: return self.set_value(dag_ids) From 25bb08e637aa4fe0f76912a11abe07709addd36c Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 12:19:40 +0530 Subject: [PATCH 07/18] Adding unit tests - part 2 --- .../core_api/routes/public/test_assets.py | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 04e2755412d3..d3eee703d01d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -18,7 +18,8 @@ import pytest -from airflow.models.asset import AssetModel +from airflow.models import DagModel +from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference from airflow.utils import timezone from airflow.utils.session import provide_session @@ -147,6 +148,56 @@ def test_filter_assets_by_uri_pattern_works(self, test_client, url, expected_ass asset_urls = {asset["uri"] for asset in response.json()["assets"]} assert expected_assets == asset_urls + @pytest.mark.parametrize("dag_ids, expected_num", [("dag1,dag2", 2), ("dag3", 1), ("dag2,dag3", 2)]) + @provide_session + def test_filter_assets_by_dag_ids_works(self, test_client, dag_ids, expected_num, session): + session.query(DagModel).delete() + session.commit() + dag1 = DagModel(dag_id="dag1") + dag2 = DagModel(dag_id="dag2") + dag3 = DagModel(dag_id="dag3") + asset1 = AssetModel("s3://folder/key") + asset2 = AssetModel("gcp://bucket/key") + asset3 = AssetModel("somescheme://asset/key") + dag_ref1 = DagScheduleAssetReference(dag_id="dag1", asset=asset1) + dag_ref2 = DagScheduleAssetReference(dag_id="dag2", asset=asset2) + task_ref1 = TaskOutletAssetReference(dag_id="dag3", task_id="task1", asset=asset3) + session.add_all([asset1, asset2, asset3, dag1, dag2, dag3, dag_ref1, dag_ref2, task_ref1]) + session.commit() + response = test_client.get( + f"/public/assets?dag_ids={dag_ids}", + ) + assert response.status_code == 200 + response_data = response.json() + assert len(response_data["assets"]) == expected_num + + @pytest.mark.parametrize( + "dag_ids, uri_pattern,expected_num", + [("dag1,dag2", "folder", 1), ("dag3", "nothing", 0), ("dag2,dag3", "key", 2)], + ) + def test_filter_assets_by_dag_ids_and_uri_pattern_works( + self, test_client, dag_ids, uri_pattern, expected_num, session + ): + session.query(DagModel).delete() + session.commit() + dag1 = DagModel(dag_id="dag1") + dag2 = DagModel(dag_id="dag2") + dag3 = DagModel(dag_id="dag3") + asset1 = AssetModel("s3://folder/key") + asset2 = AssetModel("gcp://bucket/key") + asset3 = AssetModel("somescheme://asset/key") + dag_ref1 = DagScheduleAssetReference(dag_id="dag1", asset=asset1) + dag_ref2 = DagScheduleAssetReference(dag_id="dag2", asset=asset2) + task_ref1 = TaskOutletAssetReference(dag_id="dag3", task_id="task1", asset=asset3) + session.add_all([asset1, asset2, asset3, dag1, dag2, dag3, dag_ref1, dag_ref2, task_ref1]) + session.commit() + response = test_client.get( + f"/public/assets?dag_ids={dag_ids}&uri_pattern={uri_pattern}", + ) + assert response.status_code == 200 + response_data = response.json() + assert len(response_data["assets"]) == expected_num + class TestGetAssetsEndpointPagination(TestAssets): @pytest.mark.parametrize( From fa0cd23cc9cadee09f1a53abdcd6a0a7164a316b Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 13:18:20 +0530 Subject: [PATCH 08/18] fixing unit tests & updating parameter type --- airflow/api_fastapi/common/parameters.py | 2 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ++++-- airflow/ui/openapi-gen/queries/common.ts | 4 ++-- airflow/ui/openapi-gen/queries/prefetch.ts | 6 +++--- airflow/ui/openapi-gen/queries/queries.ts | 6 +++--- airflow/ui/openapi-gen/queries/suspense.ts | 6 +++--- airflow/ui/openapi-gen/requests/services.gen.ts | 4 ++-- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_assets.py | 14 +++++++------- 9 files changed, 26 insertions(+), 24 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 74f0b7da0c35..b334fe7a4a98 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -424,7 +424,7 @@ def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.consuming_dags, skip_none) self.task_attribute = AssetModel.producing_tasks - def depends(self, dag_ids: str) -> _DagIdAssetReferencePatternSearch: + def depends(self, dag_ids: str | None = None) -> _DagIdAssetReferencePatternSearch: return self.set_value(dag_ids) def to_orm(self, select: Select) -> Select: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 307525a835c2..d12d7f6deb8e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3136,9 +3136,11 @@ paths: title: Uri Pattern - name: dag_ids in: query - required: true + required: false schema: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Ids - name: order_by in: query diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 048fe3e156e1..161e7627d41a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -56,12 +56,12 @@ export const UseAssetServiceGetAssetsKeyFn = ( orderBy, uriPattern, }: { - dagIds: string; + dagIds?: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - }, + } = {}, queryKey?: Array, ) => [ useAssetServiceGetAssetsKey, diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 2f6250ff20ad..9052d8b71989 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -48,10 +48,10 @@ export const prefetchUseAssetServiceNextRunAssets = ( * Get Assets * Get assets. * @param data The data for the request. - * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern + * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -65,12 +65,12 @@ export const prefetchUseAssetServiceGetAssets = ( orderBy, uriPattern, }: { - dagIds: string; + dagIds?: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - }, + } = {}, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn({ diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index cf706ddf17e5..73df2f3f5fc2 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -69,10 +69,10 @@ export const useAssetServiceNextRunAssets = < * Get Assets * Get assets. * @param data The data for the request. - * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern + * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -89,12 +89,12 @@ export const useAssetServiceGetAssets = < orderBy, uriPattern, }: { - dagIds: string; + dagIds?: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - }, + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e71412a1e5df..46545c6138e2 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -54,10 +54,10 @@ export const useAssetServiceNextRunAssetsSuspense = < * Get Assets * Get assets. * @param data The data for the request. - * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern + * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError @@ -74,12 +74,12 @@ export const useAssetServiceGetAssetsSuspense = < orderBy, uriPattern, }: { - dagIds: string; + dagIds?: string; limit?: number; offset?: number; orderBy?: string; uriPattern?: string; - }, + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 34da5fbff304..921af0e47128 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -134,16 +134,16 @@ export class AssetService { * Get Assets * Get assets. * @param data The data for the request. - * @param data.dagIds * @param data.limit * @param data.offset * @param data.uriPattern + * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError */ public static getAssets( - data: GetAssetsData, + data: GetAssetsData = {}, ): CancelablePromise { return __request(OpenAPI, { method: "GET", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 809edebd13de..730a0a53ea23 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -810,7 +810,7 @@ export type NextRunAssetsResponse = { }; export type GetAssetsData = { - dagIds: string; + dagIds?: string | null; limit?: number; offset?: number; orderBy?: string; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index d3eee703d01d..722573339c71 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -45,7 +45,7 @@ def _create_assets(session, num: int = 2) -> None: session.commit() -def _create_provided_asset(asset: AssetModel, session) -> None: +def _create_provided_asset(session, asset: AssetModel) -> None: session.add(asset) session.commit() @@ -62,11 +62,11 @@ def teardown_method(self) -> None: @provide_session def create_assets(self, session, num: int = 2): - _create_assets(session, num=num) + _create_assets(session=session, num=num) @provide_session def create_provided_asset(self, session, asset: AssetModel): - _create_provided_asset(session, asset) + _create_provided_asset(session=session, asset=asset) class TestGetAssets(TestAssets): @@ -141,7 +141,7 @@ def test_filter_assets_by_uri_pattern_works(self, test_client, url, expected_ass assets = [asset1, asset2, asset3, asset4] for a in assets: - self.create_provided_asset(a) + self.create_provided_asset(asset=a) response = test_client.get(url) assert response.status_code == 200 @@ -214,7 +214,7 @@ class TestGetAssetsEndpointPagination(TestAssets): ], ) def test_limit_and_offset(self, test_client, url, expected_asset_uris): - self.create_assets(110) + self.create_assets(num=110) response = test_client.get(url) @@ -223,7 +223,7 @@ def test_limit_and_offset(self, test_client, url, expected_asset_uris): assert asset_uris == expected_asset_uris def test_should_respect_page_size_limit_default(self, test_client): - self.create_assets(110) + self.create_assets(num=110) response = test_client.get("/public/assets") @@ -232,7 +232,7 @@ def test_should_respect_page_size_limit_default(self, test_client): @conf_vars({("api", "maximum_page_limit"): "150"}) def test_should_return_conf_max_if_req_max_above_conf(self, test_client): - self.create_assets(200) + self.create_assets(num=200) # change to 180 once format_parameters is integrated response = test_client.get("/public/assets?limit=150") From dd791c214af58c452ead6e8836c255ada3a5d175 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 20:42:12 +0530 Subject: [PATCH 09/18] review comments pierre --- airflow/api_fastapi/common/parameters.py | 12 ++++++------ .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ++---- .../api_fastapi/core_api/routes/public/assets.py | 4 ++-- airflow/ui/openapi-gen/queries/common.ts | 4 ++-- airflow/ui/openapi-gen/queries/prefetch.ts | 6 +++--- airflow/ui/openapi-gen/queries/queries.ts | 6 +++--- airflow/ui/openapi-gen/queries/suspense.ts | 6 +++--- airflow/ui/openapi-gen/requests/services.gen.ts | 4 ++-- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_assets.py | 14 +++++++------- 10 files changed, 31 insertions(+), 33 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index b334fe7a4a98..7cc7350e9190 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -413,18 +413,18 @@ class _UriPatternSearch(_SearchParam): def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.uri, skip_none) - def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: + def depends(self, uri_pattern: str) -> _UriPatternSearch: return self.set_value(uri_pattern) -class _DagIdAssetReferencePatternSearch(_SearchParam): +class _DagIdAssetReferenceFilter(BaseParam[str]): """Search on dag_id.""" def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.consuming_dags, skip_none) self.task_attribute = AssetModel.producing_tasks - def depends(self, dag_ids: str | None = None) -> _DagIdAssetReferencePatternSearch: + def depends(self, dag_ids: str | None = None) -> _DagIdAssetReferenceFilter: return self.set_value(dag_ids) def to_orm(self, select: Select) -> Select: @@ -433,8 +433,8 @@ def to_orm(self, select: Select) -> Select: if self.value is not None: dags_list = self.value.split(",") return select.filter( - (self.attribute.any(DagScheduleAssetReference.dag_id.in_(dags_list))) - | (self.task_attribute.any(TaskOutletAssetReference.dag_id.in_(dags_list))) + (AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(dags_list))) + | (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(dags_list))) ) @@ -533,5 +533,5 @@ def depends_float( # Assets QueryUriPatternSearch = Annotated[_UriPatternSearch, Depends(_UriPatternSearch().depends)] QueryAssetDagIdPatternSearch = Annotated[ - _DagIdAssetReferencePatternSearch, Depends(_DagIdAssetReferencePatternSearch().depends) + _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends) ] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index d12d7f6deb8e..dd3bedabf0a0 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3128,11 +3128,9 @@ paths: title: Offset - name: uri_pattern in: query - required: false + required: true schema: - anyOf: - - type: string - - type: 'null' + type: string title: Uri Pattern - name: dag_ids in: query diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 8597dc40d66a..26d09e7ff38a 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -43,7 +43,7 @@ "/", responses=create_openapi_http_exception_doc([401, 403, 404]), ) -async def get_assets( +def get_assets( limit: QueryLimit, offset: QueryOffset, uri_pattern: QueryUriPatternSearch, @@ -66,6 +66,6 @@ async def get_assets( assets = session.scalars(assets_select).all() return AssetCollectionResponse( - assets=[AssetResponse.model_validate(x, from_attributes=True) for x in assets], + assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 161e7627d41a..01d92f7c0576 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -60,8 +60,8 @@ export const UseAssetServiceGetAssetsKeyFn = ( limit?: number; offset?: number; orderBy?: string; - uriPattern?: string; - } = {}, + uriPattern: string; + }, queryKey?: Array, ) => [ useAssetServiceGetAssetsKey, diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 9052d8b71989..274e6062120d 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -48,9 +48,9 @@ export const prefetchUseAssetServiceNextRunAssets = ( * Get Assets * Get assets. * @param data The data for the request. + * @param data.uriPattern * @param data.limit * @param data.offset - * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -69,8 +69,8 @@ export const prefetchUseAssetServiceGetAssets = ( limit?: number; offset?: number; orderBy?: string; - uriPattern?: string; - } = {}, + uriPattern: string; + }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn({ diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 73df2f3f5fc2..e472edba8f4f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -69,9 +69,9 @@ export const useAssetServiceNextRunAssets = < * Get Assets * Get assets. * @param data The data for the request. + * @param data.uriPattern * @param data.limit * @param data.offset - * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -93,8 +93,8 @@ export const useAssetServiceGetAssets = < limit?: number; offset?: number; orderBy?: string; - uriPattern?: string; - } = {}, + uriPattern: string; + }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 46545c6138e2..54a3632e95b1 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -54,9 +54,9 @@ export const useAssetServiceNextRunAssetsSuspense = < * Get Assets * Get assets. * @param data The data for the request. + * @param data.uriPattern * @param data.limit * @param data.offset - * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -78,8 +78,8 @@ export const useAssetServiceGetAssetsSuspense = < limit?: number; offset?: number; orderBy?: string; - uriPattern?: string; - } = {}, + uriPattern: string; + }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 921af0e47128..f06a4417f049 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -134,16 +134,16 @@ export class AssetService { * Get Assets * Get assets. * @param data The data for the request. + * @param data.uriPattern * @param data.limit * @param data.offset - * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError */ public static getAssets( - data: GetAssetsData = {}, + data: GetAssetsData, ): CancelablePromise { return __request(OpenAPI, { method: "GET", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 730a0a53ea23..c84223f95e35 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -814,7 +814,7 @@ export type GetAssetsData = { limit?: number; offset?: number; orderBy?: string; - uriPattern?: string | null; + uriPattern: string; }; export type GetAssetsResponse = AssetCollectionResponse; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 722573339c71..c6d085d06039 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -113,16 +113,16 @@ def test_order_by_raises_400_for_invalid_attr(self, test_client, session): assert response.json()["detail"] == msg @pytest.mark.parametrize( - "url, expected_assets", + "params, expected_assets", [ - ("/public/assets?uri_pattern=s3", {"s3://folder/key"}), - ("/public/assets?uri_pattern=bucket", {"gcp://bucket/key", "wasb://some_asset_bucket_/key"}), + ({"uri_pattern": "s3"}, {"s3://folder/key"}), + ({"uri_pattern": "bucket"}, {"gcp://bucket/key", "wasb://some_asset_bucket_/key"}), ( - "/public/assets?uri_pattern=asset", + {"uri_pattern": "asset"}, {"somescheme://asset/key", "wasb://some_asset_bucket_/key"}, ), ( - "/public/assets?uri_pattern=", + {"uri_pattern": ""}, { "gcp://bucket/key", "s3://folder/key", @@ -133,7 +133,7 @@ def test_order_by_raises_400_for_invalid_attr(self, test_client, session): ], ) @provide_session - def test_filter_assets_by_uri_pattern_works(self, test_client, url, expected_assets, session): + def test_filter_assets_by_uri_pattern_works(self, test_client, params, expected_assets, session): asset1 = AssetModel("s3://folder/key") asset2 = AssetModel("gcp://bucket/key") asset3 = AssetModel("somescheme://asset/key") @@ -143,7 +143,7 @@ def test_filter_assets_by_uri_pattern_works(self, test_client, url, expected_ass for a in assets: self.create_provided_asset(asset=a) - response = test_client.get(url) + response = test_client.get("/public/assets", params=params) assert response.status_code == 200 asset_urls = {asset["uri"] for asset in response.json()["assets"]} assert expected_assets == asset_urls From 06fa0a701a9ac7ceae045b1b9d99984febc9fd17 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 8 Nov 2024 21:01:34 +0530 Subject: [PATCH 10/18] fixing last commit --- airflow/api_fastapi/common/parameters.py | 15 ++++++--------- .../core_api/openapi/v1-generated.yaml | 12 +++++++----- airflow/ui/openapi-gen/queries/common.ts | 6 +++--- airflow/ui/openapi-gen/queries/prefetch.ts | 8 ++++---- airflow/ui/openapi-gen/queries/queries.ts | 8 ++++---- airflow/ui/openapi-gen/queries/suspense.ts | 8 ++++---- airflow/ui/openapi-gen/requests/services.gen.ts | 4 ++-- airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- 8 files changed, 32 insertions(+), 33 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 7cc7350e9190..6a5cf4f3a162 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -413,28 +413,25 @@ class _UriPatternSearch(_SearchParam): def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.uri, skip_none) - def depends(self, uri_pattern: str) -> _UriPatternSearch: + def depends(self, uri_pattern: str | None = None) -> _UriPatternSearch: return self.set_value(uri_pattern) -class _DagIdAssetReferenceFilter(BaseParam[str]): +class _DagIdAssetReferenceFilter(BaseParam[list[str]]): """Search on dag_id.""" def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.consuming_dags, skip_none) - self.task_attribute = AssetModel.producing_tasks - def depends(self, dag_ids: str | None = None) -> _DagIdAssetReferenceFilter: + def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdAssetReferenceFilter: return self.set_value(dag_ids) def to_orm(self, select: Select) -> Select: if self.value is None and self.skip_none: return select - if self.value is not None: - dags_list = self.value.split(",") - return select.filter( - (AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(dags_list))) - | (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(dags_list))) + return select.where( + (AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(self.value))) + | (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(self.value))) ) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index dd3bedabf0a0..fa3c3085f9cc 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3128,17 +3128,19 @@ paths: title: Offset - name: uri_pattern in: query - required: true + required: false schema: - type: string + anyOf: + - type: string + - type: 'null' title: Uri Pattern - name: dag_ids in: query required: false schema: - anyOf: - - type: string - - type: 'null' + type: array + items: + type: string title: Dag Ids - name: order_by in: query diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 01d92f7c0576..9f5657391bb3 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -56,12 +56,12 @@ export const UseAssetServiceGetAssetsKeyFn = ( orderBy, uriPattern, }: { - dagIds?: string; + dagIds?: string[]; limit?: number; offset?: number; orderBy?: string; - uriPattern: string; - }, + uriPattern?: string; + } = {}, queryKey?: Array, ) => [ useAssetServiceGetAssetsKey, diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 274e6062120d..9354ff89bed5 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -48,9 +48,9 @@ export const prefetchUseAssetServiceNextRunAssets = ( * Get Assets * Get assets. * @param data The data for the request. - * @param data.uriPattern * @param data.limit * @param data.offset + * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -65,12 +65,12 @@ export const prefetchUseAssetServiceGetAssets = ( orderBy, uriPattern, }: { - dagIds?: string; + dagIds?: string[]; limit?: number; offset?: number; orderBy?: string; - uriPattern: string; - }, + uriPattern?: string; + } = {}, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn({ diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e472edba8f4f..a70f6c5dbc6f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -69,9 +69,9 @@ export const useAssetServiceNextRunAssets = < * Get Assets * Get assets. * @param data The data for the request. - * @param data.uriPattern * @param data.limit * @param data.offset + * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -89,12 +89,12 @@ export const useAssetServiceGetAssets = < orderBy, uriPattern, }: { - dagIds?: string; + dagIds?: string[]; limit?: number; offset?: number; orderBy?: string; - uriPattern: string; - }, + uriPattern?: string; + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 54a3632e95b1..90d14c8eb6ca 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -54,9 +54,9 @@ export const useAssetServiceNextRunAssetsSuspense = < * Get Assets * Get assets. * @param data The data for the request. - * @param data.uriPattern * @param data.limit * @param data.offset + * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response @@ -74,12 +74,12 @@ export const useAssetServiceGetAssetsSuspense = < orderBy, uriPattern, }: { - dagIds?: string; + dagIds?: string[]; limit?: number; offset?: number; orderBy?: string; - uriPattern: string; - }, + uriPattern?: string; + } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index f06a4417f049..921af0e47128 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -134,16 +134,16 @@ export class AssetService { * Get Assets * Get assets. * @param data The data for the request. - * @param data.uriPattern * @param data.limit * @param data.offset + * @param data.uriPattern * @param data.dagIds * @param data.orderBy * @returns AssetCollectionResponse Successful Response * @throws ApiError */ public static getAssets( - data: GetAssetsData, + data: GetAssetsData = {}, ): CancelablePromise { return __request(OpenAPI, { method: "GET", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c84223f95e35..f750cc258ae8 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -810,11 +810,11 @@ export type NextRunAssetsResponse = { }; export type GetAssetsData = { - dagIds?: string | null; + dagIds?: Array; limit?: number; offset?: number; orderBy?: string; - uriPattern: string; + uriPattern?: string | null; }; export type GetAssetsResponse = AssetCollectionResponse; From 7a97220995f922bb94c82bf38059ca6924b27215 Mon Sep 17 00:00:00 2001 From: Amogh Date: Sat, 9 Nov 2024 08:44:03 +0530 Subject: [PATCH 11/18] fixing unit tests --- airflow/api_fastapi/common/parameters.py | 3 +++ .../core_api/routes/public/test_assets.py | 12 +----------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 6a5cf4f3a162..b60f5751a828 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -424,6 +424,9 @@ def __init__(self, skip_none: bool = True) -> None: super().__init__(AssetModel.consuming_dags, skip_none) def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdAssetReferenceFilter: + # needed to handle cases where dag_ids=a1,b1 + if dag_ids and len(dag_ids) == 1 and "," in dag_ids[0]: + dag_ids = dag_ids[0].split(",") return self.set_value(dag_ids) def to_orm(self, select: Select) -> Select: diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index c6d085d06039..e243c3c2930d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -23,7 +23,6 @@ from airflow.utils import timezone from airflow.utils.session import provide_session -from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_assets pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -175,6 +174,7 @@ def test_filter_assets_by_dag_ids_works(self, test_client, dag_ids, expected_num "dag_ids, uri_pattern,expected_num", [("dag1,dag2", "folder", 1), ("dag3", "nothing", 0), ("dag2,dag3", "key", 2)], ) + @provide_session def test_filter_assets_by_dag_ids_and_uri_pattern_works( self, test_client, dag_ids, uri_pattern, expected_num, session ): @@ -229,13 +229,3 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 - - @conf_vars({("api", "maximum_page_limit"): "150"}) - def test_should_return_conf_max_if_req_max_above_conf(self, test_client): - self.create_assets(num=200) - - # change to 180 once format_parameters is integrated - response = test_client.get("/public/assets?limit=150") - - assert response.status_code == 200 - assert len(response.json()["assets"]) == 150 From df1ff8eb530b6bf4c822933a03ade9811c94c959 Mon Sep 17 00:00:00 2001 From: Amogh Date: Mon, 11 Nov 2024 15:10:04 +0530 Subject: [PATCH 12/18] AIP-84: Migrating GET Dataset events for DAG runs to fastAPI --- .../endpoints/dag_run_endpoint.py | 1 + .../api_fastapi/core_api/datamodels/assets.py | 39 ++++- .../core_api/openapi/v1-generated.yaml | 164 ++++++++++++++++++ .../core_api/routes/public/dag_run.py | 39 +++++ airflow/ui/openapi-gen/queries/common.ts | 22 +++ airflow/ui/openapi-gen/queries/prefetch.ts | 26 +++ airflow/ui/openapi-gen/queries/queries.ts | 33 ++++ airflow/ui/openapi-gen/queries/suspense.ts | 33 ++++ .../ui/openapi-gen/requests/schemas.gen.ts | 164 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 30 ++++ airflow/ui/openapi-gen/requests/types.gen.ts | 74 ++++++++ 11 files changed, 624 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 6a38eb27ff45..a97f7d0b20ef 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -115,6 +115,7 @@ def get_dag_run( raise BadRequest("DAGRunSchema error", detail=str(e)) +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.RUN) @security.requires_access_asset("GET") @provide_session diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 498dc4edae6f..595e9f8e6623 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -19,7 +19,7 @@ from datetime import datetime -from pydantic import BaseModel +from pydantic import BaseModel, Field class DagScheduleAssetReference(BaseModel): @@ -64,3 +64,40 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): + """Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + + run_id: str + dag_id: str + execution_date: datetime = Field(alias="logical_date") + start_date: datetime + end_date: datetime + state: str + data_interval_start: datetime + data_interval_end: datetime + + +class AssetEventResponse(BaseModel): + """Asset event serializer for responses.""" + + id: int + asset_id: int + # piggyback on the fix for https://github.com/apache/airflow/issues/43845 for asset_uri + # meanwhile, unblock by adding uri below + uri: str + extra: dict | None = None + source_task_id: str | None = None + source_dag_id: str | None = None + source_run_id: str | None = None + source_map_index: int + created_dagruns: list[DagRunAssetReference] + timestamp: datetime + + +class AssetEventCollectionResponse(BaseModel): + """Asset collection response.""" + + asset_events: list[AssetEventResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index f1dd5d6ba433..e1ea3fe86c40 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1387,6 +1387,58 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents: + get: + tags: + - DagRun + summary: Get Upstream Asset Events + description: If dag run is asset-triggered, return the asset events that triggered + it. + operationId: get_upstream_asset_events + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dagSources/{file_token}: get: tags: @@ -3259,6 +3311,75 @@ components: - total_entries title: AssetCollectionResponse description: Asset collection response. + AssetEventCollectionResponse: + properties: + asset_events: + items: + $ref: '#/components/schemas/AssetEventResponse' + type: array + title: Asset Events + total_entries: + type: integer + title: Total Entries + type: object + required: + - asset_events + - total_entries + title: AssetEventCollectionResponse + description: Asset collection response. + AssetEventResponse: + properties: + id: + type: integer + title: Id + asset_id: + type: integer + title: Asset Id + uri: + type: string + title: Uri + extra: + anyOf: + - type: object + - type: 'null' + title: Extra + source_task_id: + anyOf: + - type: string + - type: 'null' + title: Source Task Id + source_dag_id: + anyOf: + - type: string + - type: 'null' + title: Source Dag Id + source_run_id: + anyOf: + - type: string + - type: 'null' + title: Source Run Id + source_map_index: + type: integer + title: Source Map Index + created_dagruns: + items: + $ref: '#/components/schemas/DagRunAssetReference' + type: array + title: Created Dagruns + timestamp: + type: string + format: date-time + title: Timestamp + type: object + required: + - id + - asset_id + - uri + - source_map_index + - created_dagruns + - timestamp + title: AssetEventResponse + description: Asset event serializer for responses. AssetResponse: properties: id: @@ -4230,6 +4351,49 @@ components: - latest_dag_processor_heartbeat title: DagProcessorInfoSchema description: Schema for DagProcessor info. + DagRunAssetReference: + properties: + run_id: + type: string + title: Run Id + dag_id: + type: string + title: Dag Id + logical_date: + type: string + format: date-time + title: Logical Date + start_date: + type: string + format: date-time + title: Start Date + end_date: + type: string + format: date-time + title: End Date + state: + type: string + title: State + data_interval_start: + type: string + format: date-time + title: Data Interval Start + data_interval_end: + type: string + format: date-time + title: Data Interval End + type: object + required: + - run_id + - dag_id + - logical_date + - start_date + - end_date + - state + - data_interval_start + - data_interval_end + title: DagRunAssetReference + description: Serializable version of the DagRunAssetReference ORM SqlAlchemyModel. DagRunState: type: string enum: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index d43fde9e340f..53d1cb8654ea 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -29,6 +29,7 @@ ) from airflow.api_fastapi.common.db.common import get_session from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse, AssetEventResponse from airflow.api_fastapi.core_api.datamodels.dag_run import ( DAGRunPatchBody, DAGRunPatchStates, @@ -147,3 +148,41 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.get( + "/{dag_run_id}/upstreamAssetEvents", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_401_UNAUTHORIZED, + status.HTTP_403_FORBIDDEN, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_upstream_asset_events( + dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] +) -> AssetEventCollectionResponse: + """If dag run is asset-triggered, return the asset events that triggered it.""" + dag_run: DagRun | None = session.scalar( + select(DagRun).where( + DagRun.dag_id == dag_id, + DagRun.run_id == dag_run_id, + ) + ) + if dag_run is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + events = dag_run.consumed_asset_events + + print("events" * 10) + print(events) + + return AssetEventCollectionResponse( + asset_events=[ + AssetEventResponse.model_validate(asset_event, from_attributes=True) for asset_event in events + ], + total_entries=len(events), + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 5077e55c3902..2b92e08381d5 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -350,6 +350,28 @@ export const UseDagRunServiceGetDagRunKeyFn = ( }, queryKey?: Array, ) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; +export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = Awaited< + ReturnType +>; +export type DagRunServiceGetUpstreamAssetEventsQueryResult< + TData = DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagRunServiceGetUpstreamAssetEventsKey = + "DagRunServiceGetUpstreamAssetEvents"; +export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: Array, +) => [ + useDagRunServiceGetUpstreamAssetEventsKey, + ...(queryKey ?? [{ dagId, dagRunId }]), +]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 9354ff89bed5..a2580a604a33 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -449,6 +449,32 @@ export const prefetchUseDagRunServiceGetDagRun = ( queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseDagRunServiceGetUpstreamAssetEvents = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ + dagId, + dagRunId, + }), + queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }), + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 0e43d66d1397..f8327845d83f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -562,6 +562,39 @@ export const useDagRunServiceGetDagRun = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetUpstreamAssetEvents = < + TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => + DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData, + ...options, + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 90d14c8eb6ca..9dbb93d9218f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -547,6 +547,39 @@ export const useDagRunServiceGetDagRunSuspense = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceGetUpstreamAssetEventsSuspense = < + TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + }: { + dagId: string; + dagRunId: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn( + { dagId, dagRunId }, + queryKey, + ), + queryFn: () => + DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData, + ...options, + }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 36a2b26ebd28..108ac0495d50 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -127,6 +127,114 @@ export const $AssetCollectionResponse = { description: "Asset collection response.", } as const; +export const $AssetEventCollectionResponse = { + properties: { + asset_events: { + items: { + $ref: "#/components/schemas/AssetEventResponse", + }, + type: "array", + title: "Asset Events", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["asset_events", "total_entries"], + title: "AssetEventCollectionResponse", + description: "Asset collection response.", +} as const; + +export const $AssetEventResponse = { + properties: { + id: { + type: "integer", + title: "Id", + }, + asset_id: { + type: "integer", + title: "Asset Id", + }, + uri: { + type: "string", + title: "Uri", + }, + extra: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Extra", + }, + source_task_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Source Task Id", + }, + source_dag_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Source Dag Id", + }, + source_run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Source Run Id", + }, + source_map_index: { + type: "integer", + title: "Source Map Index", + }, + created_dagruns: { + items: { + $ref: "#/components/schemas/DagRunAssetReference", + }, + type: "array", + title: "Created Dagruns", + }, + timestamp: { + type: "string", + format: "date-time", + title: "Timestamp", + }, + }, + type: "object", + required: [ + "id", + "asset_id", + "uri", + "source_map_index", + "created_dagruns", + "timestamp", + ], + title: "AssetEventResponse", + description: "Asset event serializer for responses.", +} as const; + export const $AssetResponse = { properties: { id: { @@ -1669,6 +1777,62 @@ export const $DagProcessorInfoSchema = { description: "Schema for DagProcessor info.", } as const; +export const $DagRunAssetReference = { + properties: { + run_id: { + type: "string", + title: "Run Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + start_date: { + type: "string", + format: "date-time", + title: "Start Date", + }, + end_date: { + type: "string", + format: "date-time", + title: "End Date", + }, + state: { + type: "string", + title: "State", + }, + data_interval_start: { + type: "string", + format: "date-time", + title: "Data Interval Start", + }, + data_interval_end: { + type: "string", + format: "date-time", + title: "Data Interval End", + }, + }, + type: "object", + required: [ + "run_id", + "dag_id", + "logical_date", + "start_date", + "end_date", + "state", + "data_interval_start", + "data_interval_end", + ], + title: "DagRunAssetReference", + description: + "Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.", +} as const; + export const $DagRunState = { type: "string", enum: ["queued", "running", "success", "failed"], diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index b2ab26989fbd..d5d739f4bd21 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -53,6 +53,8 @@ import type { DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, + GetUpstreamAssetEventsData, + GetUpstreamAssetEventsResponse, GetDagSourceData, GetDagSourceResponse, GetEventLogData, @@ -864,6 +866,34 @@ export class DagRunService { }, }); } + + /** + * Get Upstream Asset Events + * If dag run is asset-triggered, return the asset events that triggered it. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @returns AssetEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getUpstreamAssetEvents( + data: GetUpstreamAssetEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DagSourceService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c609269739b2..780ee73d5e30 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -37,6 +37,32 @@ export type AssetCollectionResponse = { total_entries: number; }; +/** + * Asset collection response. + */ +export type AssetEventCollectionResponse = { + asset_events: Array; + total_entries: number; +}; + +/** + * Asset event serializer for responses. + */ +export type AssetEventResponse = { + id: number; + asset_id: number; + uri: string; + extra?: { + [key: string]: unknown; + } | null; + source_task_id?: string | null; + source_dag_id?: string | null; + source_run_id?: string | null; + source_map_index: number; + created_dagruns: Array; + timestamp: string; +}; + /** * Asset serializer for responses. */ @@ -349,6 +375,20 @@ export type DagProcessorInfoSchema = { latest_dag_processor_heartbeat: string | null; }; +/** + * Serializable version of the DagRunAssetReference ORM SqlAlchemyModel. + */ +export type DagRunAssetReference = { + run_id: string; + dag_id: string; + logical_date: string; + start_date: string; + end_date: string; + state: string; + data_interval_start: string; + data_interval_end: string; +}; + /** * All possible states that a DagRun can be in. * @@ -1002,6 +1042,13 @@ export type PatchDagRunData = { export type PatchDagRunResponse = DAGRunResponse; +export type GetUpstreamAssetEventsData = { + dagId: string; + dagRunId: string; +}; + +export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse; + export type GetDagSourceData = { accept?: string; fileToken: string; @@ -1892,6 +1939,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents": { + get: { + req: GetUpstreamAssetEventsData; + res: { + /** + * Successful Response + */ + 200: AssetEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dagSources/{file_token}": { get: { req: GetDagSourceData; From 6c80cedd5166796557f2606c382bc874d337cc02 Mon Sep 17 00:00:00 2001 From: Amogh Date: Mon, 11 Nov 2024 15:35:32 +0530 Subject: [PATCH 13/18] adding test cases --- airflow/api_fastapi/core_api/datamodels/assets.py | 2 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 7 ++++--- airflow/ui/openapi-gen/requests/schemas.gen.ts | 12 +++++++++--- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 595e9f8e6623..49d909d79456 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -73,7 +73,7 @@ class DagRunAssetReference(BaseModel): dag_id: str execution_date: datetime = Field(alias="logical_date") start_date: datetime - end_date: datetime + end_date: datetime | None = None state: str data_interval_start: datetime data_interval_end: datetime diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index e1ea3fe86c40..46d10648f65c 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4368,8 +4368,10 @@ components: format: date-time title: Start Date end_date: - type: string - format: date-time + anyOf: + - type: string + format: date-time + - type: 'null' title: End Date state: type: string @@ -4388,7 +4390,6 @@ components: - dag_id - logical_date - start_date - - end_date - state - data_interval_start - data_interval_end diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 108ac0495d50..f144019804b0 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1798,8 +1798,15 @@ export const $DagRunAssetReference = { title: "Start Date", }, end_date: { - type: "string", - format: "date-time", + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], title: "End Date", }, state: { @@ -1823,7 +1830,6 @@ export const $DagRunAssetReference = { "dag_id", "logical_date", "start_date", - "end_date", "state", "data_interval_start", "data_interval_end", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 780ee73d5e30..da4d2e8c28be 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -383,7 +383,7 @@ export type DagRunAssetReference = { dag_id: string; logical_date: string; start_date: string; - end_date: string; + end_date?: string | null; state: string; data_interval_start: string; data_interval_end: string; From e344263fe64e9ac67332c2a3a426f93afaea48ac Mon Sep 17 00:00:00 2001 From: Amogh Date: Mon, 11 Nov 2024 15:40:57 +0530 Subject: [PATCH 14/18] adding test cases --- .../core_api/routes/public/test_dag_run.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 64c3512e88b7..9d8048f1c1a7 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -21,6 +21,8 @@ import pytest +from airflow import Asset +from airflow.models.asset import AssetEvent, AssetModel from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session from airflow.utils.state import DagRunState @@ -254,3 +256,77 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestGetDagRunAssetTriggerEvents: + def test_should_respond_200(self, test_client, dag_maker, session): + asset1 = Asset(uri="ds1") + + with dag_maker(dag_id="source_dag", start_date=START_DATE, session=session): + EmptyOperator(task_id="task", outlets=[asset1]) + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + + asset1_id = session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar() + event = AssetEvent( + asset_id=asset1_id, + source_task_id=ti.task_id, + source_dag_id=ti.dag_id, + source_run_id=ti.run_id, + source_map_index=ti.map_index, + ) + session.add(event) + + with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE, session=session): + pass + dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID", run_type=DagRunType.ASSET_TRIGGERED) + dr.consumed_asset_events.append(event) + + session.commit() + assert event.timestamp + + response = test_client.get( + "/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents", + ) + assert response.status_code == 200 + expected_response = { + "asset_events": [ + { + "timestamp": event.timestamp.isoformat().replace("+00:00", "Z"), + "asset_id": asset1_id, + # piggyback on the fix for https://github.com/apache/airflow/issues/43845 for asset_uri + # meanwhile, unblock by adding uri below + "uri": asset1.uri, + "extra": {}, + "id": event.id, + "source_dag_id": ti.dag_id, + "source_map_index": ti.map_index, + "source_run_id": ti.run_id, + "source_task_id": ti.task_id, + "created_dagruns": [ + { + "dag_id": "TEST_DAG_ID", + "run_id": "TEST_DAG_RUN_ID", + "data_interval_end": dr.data_interval_end.isoformat().replace("+00:00", "Z"), + "data_interval_start": dr.data_interval_start.isoformat().replace("+00:00", "Z"), + "end_date": None, + "logical_date": dr.logical_date.isoformat().replace("+00:00", "Z"), + "start_date": dr.start_date.isoformat().replace("+00:00", "Z"), + "state": "running", + } + ], + } + ], + "total_entries": 1, + } + assert response.json() == expected_response + + def test_should_respond_404(self, test_client): + response = test_client.get( + "public/dags/invalid-id/dagRuns/invalid-id/upstreamAssetEvents", + ) + assert response.status_code == 404 + assert ( + "The DagRun with dag_id: `invalid-id` and run_id: `invalid-id` was not found" + == response.json()["detail"] + ) From e7b96aa6b63e48100aae3581bd4427874c1d8c78 Mon Sep 17 00:00:00 2001 From: Amogh Date: Wed, 13 Nov 2024 12:56:24 +0530 Subject: [PATCH 15/18] review comments pierre --- airflow/api_fastapi/core_api/datamodels/assets.py | 13 +++++++++---- .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 +++--- .../api_fastapi/core_api/routes/public/dag_run.py | 4 ---- airflow/ui/openapi-gen/requests/schemas.gen.ts | 6 +++--- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 49d909d79456..e6a7ca3d3744 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -19,7 +19,7 @@ from datetime import datetime -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator class DagScheduleAssetReference(BaseModel): @@ -84,9 +84,7 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - # piggyback on the fix for https://github.com/apache/airflow/issues/43845 for asset_uri - # meanwhile, unblock by adding uri below - uri: str + asset_uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -95,6 +93,13 @@ class AssetEventResponse(BaseModel): created_dagruns: list[DagRunAssetReference] timestamp: datetime + @model_validator(mode="before") + def rename_uri_to_asset_uri(cls, values): + """Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" + if hasattr(values, "uri") and values.uri: + values.asset_uri = values.uri + return values + class AssetEventCollectionResponse(BaseModel): """Asset collection response.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9beae20a04be..1c3d56bdbab7 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3476,9 +3476,9 @@ components: asset_id: type: integer title: Asset Id - uri: + asset_uri: type: string - title: Uri + title: Asset Uri extra: anyOf: - type: object @@ -3515,7 +3515,7 @@ components: required: - id - asset_id - - uri + - asset_uri - source_map_index - created_dagruns - timestamp diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 94c050028fa9..282d7de0ea4d 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -177,10 +177,6 @@ def get_upstream_asset_events( f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", ) events = dag_run.consumed_asset_events - - print("events" * 10) - print(events) - return AssetEventCollectionResponse( asset_events=[ AssetEventResponse.model_validate(asset_event, from_attributes=True) for asset_event in events diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index bc72a13ea130..a17f916bc8a3 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -157,9 +157,9 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - uri: { + asset_uri: { type: "string", - title: "Uri", + title: "Asset Uri", }, extra: { anyOf: [ @@ -226,7 +226,7 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "uri", + "asset_uri", "source_map_index", "created_dagruns", "timestamp", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index ee57857dec10..5a8611a10efb 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -51,7 +51,7 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - uri: string; + asset_uri: string; extra?: { [key: string]: unknown; } | null; From c4d7dfc445e6f607128bb96059b3b17e39e4d6c4 Mon Sep 17 00:00:00 2001 From: Amogh Date: Wed, 13 Nov 2024 12:58:03 +0530 Subject: [PATCH 16/18] fixing unit tests --- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 9d8048f1c1a7..ea8c01e2a6ae 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -294,9 +294,7 @@ def test_should_respond_200(self, test_client, dag_maker, session): { "timestamp": event.timestamp.isoformat().replace("+00:00", "Z"), "asset_id": asset1_id, - # piggyback on the fix for https://github.com/apache/airflow/issues/43845 for asset_uri - # meanwhile, unblock by adding uri below - "uri": asset1.uri, + "asset_uri": asset1.uri, "extra": {}, "id": event.id, "source_dag_id": ti.dag_id, From 0da44d0514b03f404abcd646c56babfa61eddb49 Mon Sep 17 00:00:00 2001 From: Amogh Date: Thu, 14 Nov 2024 09:57:13 +0530 Subject: [PATCH 17/18] review comments pierre --- airflow/api_fastapi/core_api/datamodels/assets.py | 15 ++++----------- .../core_api/openapi/v1-generated.yaml | 10 +++++----- airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +++++------ airflow/ui/openapi-gen/requests/types.gen.ts | 6 +++--- .../core_api/routes/public/test_dag_run.py | 2 +- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index e6a7ca3d3744..e2c1569ae7f8 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -19,7 +19,7 @@ from datetime import datetime -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field class DagScheduleAssetReference(BaseModel): @@ -67,7 +67,7 @@ class AssetCollectionResponse(BaseModel): class DagRunAssetReference(BaseModel): - """Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + """Dag Run Asset Reference serializer for responses.""" run_id: str dag_id: str @@ -84,7 +84,7 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - asset_uri: str + uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -93,16 +93,9 @@ class AssetEventResponse(BaseModel): created_dagruns: list[DagRunAssetReference] timestamp: datetime - @model_validator(mode="before") - def rename_uri_to_asset_uri(cls, values): - """Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" - if hasattr(values, "uri") and values.uri: - values.asset_uri = values.uri - return values - class AssetEventCollectionResponse(BaseModel): - """Asset collection response.""" + """Asset Event collection response.""" asset_events: list[AssetEventResponse] total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 08d86974b392..d1cd85e4fe81 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3634,7 +3634,7 @@ components: - asset_events - total_entries title: AssetEventCollectionResponse - description: Asset collection response. + description: Asset Event collection response. AssetEventResponse: properties: id: @@ -3643,9 +3643,9 @@ components: asset_id: type: integer title: Asset Id - asset_uri: + uri: type: string - title: Asset Uri + title: Uri extra: anyOf: - type: object @@ -3682,7 +3682,7 @@ components: required: - id - asset_id - - asset_uri + - uri - source_map_index - created_dagruns - timestamp @@ -4795,7 +4795,7 @@ components: - data_interval_start - data_interval_end title: DagRunAssetReference - description: Serializable version of the DagRunAssetReference ORM SqlAlchemyModel. + description: Dag Run Asset Reference serializer for responses. DagRunState: type: string enum: diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index c73996942ebd..9200738e2980 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -144,7 +144,7 @@ export const $AssetEventCollectionResponse = { type: "object", required: ["asset_events", "total_entries"], title: "AssetEventCollectionResponse", - description: "Asset collection response.", + description: "Asset Event collection response.", } as const; export const $AssetEventResponse = { @@ -157,9 +157,9 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - asset_uri: { + uri: { type: "string", - title: "Asset Uri", + title: "Uri", }, extra: { anyOf: [ @@ -226,7 +226,7 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "asset_uri", + "uri", "source_map_index", "created_dagruns", "timestamp", @@ -1961,8 +1961,7 @@ export const $DagRunAssetReference = { "data_interval_end", ], title: "DagRunAssetReference", - description: - "Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.", + description: "Dag Run Asset Reference serializer for responses.", } as const; export const $DagRunState = { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 6374bf2c3bcd..0933680817cb 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -38,7 +38,7 @@ export type AssetCollectionResponse = { }; /** - * Asset collection response. + * Asset Event collection response. */ export type AssetEventCollectionResponse = { asset_events: Array; @@ -51,7 +51,7 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - asset_uri: string; + uri: string; extra?: { [key: string]: unknown; } | null; @@ -412,7 +412,7 @@ export type DagProcessorInfoSchema = { }; /** - * Serializable version of the DagRunAssetReference ORM SqlAlchemyModel. + * Dag Run Asset Reference serializer for responses. */ export type DagRunAssetReference = { run_id: string; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index ea8c01e2a6ae..7b643e25d1e1 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -294,7 +294,7 @@ def test_should_respond_200(self, test_client, dag_maker, session): { "timestamp": event.timestamp.isoformat().replace("+00:00", "Z"), "asset_id": asset1_id, - "asset_uri": asset1.uri, + "uri": asset1.uri, "extra": {}, "id": event.id, "source_dag_id": ti.dag_id, From f93032c830cceb671731f4f606f6a0ccdaae2c72 Mon Sep 17 00:00:00 2001 From: Amogh Date: Thu, 14 Nov 2024 21:09:25 +0530 Subject: [PATCH 18/18] review comments and fixing a test --- airflow/api_fastapi/core_api/datamodels/assets.py | 2 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ++++-- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 -- airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +++++++++-- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_dag_run.py | 1 + 6 files changed, 16 insertions(+), 8 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 85e41ff7b569..340daad3c16a 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -73,7 +73,7 @@ class DagRunAssetReference(BaseModel): dag_id: str execution_date: datetime = Field(alias="logical_date") start_date: datetime - end_date: datetime + end_date: datetime | None state: str data_interval_start: datetime data_interval_end: datetime diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index ec1bc4fea018..1b6cd24f1110 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4869,8 +4869,10 @@ components: format: date-time title: Start Date end_date: - type: string - format: date-time + anyOf: + - type: string + format: date-time + - type: 'null' title: End Date state: type: string diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 6ebe77f0f66b..698e322a12ec 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -149,8 +149,6 @@ def patch_dag_run( "/{dag_run_id}/upstreamAssetEvents", responses=create_openapi_http_exception_doc( [ - status.HTTP_401_UNAUTHORIZED, - status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND, ] ), diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 7bf8f4b02966..a79e11c02157 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1924,8 +1924,15 @@ export const $DagRunAssetReference = { title: "Start Date", }, end_date: { - type: "string", - format: "date-time", + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], title: "End Date", }, state: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1442d06e9772..086ca98b71bd 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -419,7 +419,7 @@ export type DagRunAssetReference = { dag_id: string; logical_date: string; start_date: string; - end_date: string; + end_date: string | null; state: string; data_interval_start: string; data_interval_end: string; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 7b643e25d1e1..ed11e0ad933f 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -51,6 +51,7 @@ DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) +END_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) DAG1_RUN1_NOTE = "test_note"