diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 085817213d0a..0c45ddd70957 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -61,6 +61,7 @@ RESOURCE_EVENT_PREFIX = "asset" +@mark_fastapi_migration_done @security.requires_access_asset("GET") @provide_session def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b487962498fb..ea0044eebf31 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -246,6 +246,51 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/assets/{uri}: + get: + tags: + - Asset + summary: Get Asset + description: Get an asset. + operationId: get_asset + parameters: + - name: uri + in: path + required: true + schema: + type: string + title: Uri + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/backfills/: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 7ec7012de8d3..d093eef37206 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -19,9 +19,9 @@ from typing import Annotated -from fastapi import Depends +from fastapi import Depends, HTTPException, status from sqlalchemy import select -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, joinedload, subqueryload from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( @@ -63,9 +63,33 @@ def get_assets( limit=limit, session=session, ) - - assets = session.scalars(assets_select).all() + assets = session.scalars( + assets_select.options( + subqueryload(AssetModel.consuming_dags), subqueryload(AssetModel.producing_tasks) + ) + ).all() return AssetCollectionResponse( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( + "/{uri:path}", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +def get_asset( + uri: str, + session: Annotated[Session, Depends(get_session)], +) -> AssetResponse: + """Get an asset.""" + asset = session.scalar( + select(AssetModel) + .where(AssetModel.uri == uri) + .options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks)) + ) + + if asset is None: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") + + return AssetResponse.model_validate(asset, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 51fc4d6d001e..1fff143182f9 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -69,6 +69,22 @@ export const UseAssetServiceGetAssetsKeyFn = ( useAssetServiceGetAssetsKey, ...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]), ]; +export type AssetServiceGetAssetDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetAssetQueryResult< + TData = AssetServiceGetAssetDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetAssetKey = "AssetServiceGetAsset"; +export const UseAssetServiceGetAssetKeyFn = ( + { + uri, + }: { + uri: string; + }, + queryKey?: Array, +) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 76d0a35e7bf0..b822e35dd33e 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -85,6 +85,26 @@ export const prefetchUseAssetServiceGetAssets = ( queryFn: () => AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }), }); +/** + * Get Asset + * Get an asset. + * @param data The data for the request. + * @param data.uri + * @returns AssetResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetAsset = ( + queryClient: QueryClient, + { + uri, + }: { + uri: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), + queryFn: () => AssetService.getAsset({ uri }), + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 31b73b7aa263..475126ef506e 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -115,6 +115,32 @@ export const useAssetServiceGetAssets = < }) as TData, ...options, }); +/** + * Get Asset + * Get an asset. + * @param data The data for the request. + * @param data.uri + * @returns AssetResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAsset = < + TData = Common.AssetServiceGetAssetDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + uri, + }: { + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), + queryFn: () => AssetService.getAsset({ uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 534eafeca1f3..83b76e51127f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -100,6 +100,32 @@ export const useAssetServiceGetAssetsSuspense = < }) as TData, ...options, }); +/** + * Get Asset + * Get an asset. + * @param data The data for the request. + * @param data.uri + * @returns AssetResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetSuspense = < + TData = Common.AssetServiceGetAssetDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + uri, + }: { + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), + queryFn: () => AssetService.getAsset({ uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 16e3873458d0..ac1ea6d6441d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -7,6 +7,8 @@ import type { NextRunAssetsResponse, GetAssetsData, GetAssetsResponse, + GetAssetData, + GetAssetResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -169,6 +171,32 @@ export class AssetService { }, }); } + + /** + * Get Asset + * Get an asset. + * @param data The data for the request. + * @param data.uri + * @returns AssetResponse Successful Response + * @throws ApiError + */ + public static getAsset( + data: GetAssetData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/assets/{uri}", + path: { + uri: data.uri, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 3d6ad5698312..4934ea97ea73 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -930,6 +930,12 @@ export type GetAssetsData = { export type GetAssetsResponse = AssetCollectionResponse; +export type GetAssetData = { + uri: string; +}; + +export type GetAssetResponse = AssetResponse; + export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1417,6 +1423,33 @@ export type $OpenApiTs = { }; }; }; + "/public/assets/{uri}": { + get: { + req: GetAssetData; + res: { + /** + * Successful Response + */ + 200: AssetResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index e243c3c2930d..eb72c1a99acb 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +import urllib + import pytest from airflow.models import DagModel @@ -23,6 +25,7 @@ from airflow.utils import timezone from airflow.utils.session import provide_session +from tests_common.test_utils.asserts import assert_queries_count from tests_common.test_utils.db import clear_db_assets pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -229,3 +232,42 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEndpoint(TestAssets): + @pytest.mark.parametrize( + "url", + [ + urllib.parse.quote( + "s3://bucket/key/1", safe="" + ), # api should cover raw as well as unquoted case like legacy + "s3://bucket/key/1", + ], + ) + @provide_session + def test_should_respond_200(self, test_client, url, session): + self.create_assets(num=1) + assert session.query(AssetModel).count() == 1 + tz_datetime_format = self.default_time.replace("+00:00", "Z") + with assert_queries_count(6): + response = test_client.get( + f"/public/assets/{url}", + ) + assert response.status_code == 200 + assert response.json() == { + "id": 1, + "uri": "s3://bucket/key/1", + "extra": {"foo": "bar"}, + "created_at": tz_datetime_format, + "updated_at": tz_datetime_format, + "consuming_dags": [], + "producing_tasks": [], + "aliases": [], + } + + def test_should_respond_404(self, test_client): + response = test_client.get( + f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}", + ) + assert response.status_code == 404 + assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"