Skip to content

Commit

Permalink
AIP 84: Migrate GET one ASSET legacy API to fast API (apache#43825)
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh authored Nov 14, 2024
1 parent 4af2c27 commit c84d356
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 4 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
RESOURCE_EVENT_PREFIX = "asset"


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
Expand Down
45 changes: 45 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,51 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
- Asset
summary: Get Asset
description: Get an asset.
operationId: get_asset
parameters:
- name: uri
in: path
required: true
schema:
type: string
title: Uri
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AssetResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down
32 changes: 28 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

from typing import Annotated

from fastapi import Depends
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload, subqueryload

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -63,9 +63,33 @@ def get_assets(
limit=limit,
session=session,
)

assets = session.scalars(assets_select).all()
assets = session.scalars(
assets_select.options(
subqueryload(AssetModel.consuming_dags), subqueryload(AssetModel.producing_tasks)
)
).all()
return AssetCollectionResponse(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets],
total_entries=total_entries,
)


@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
uri: str,
session: Annotated[Session, Depends(get_session)],
) -> AssetResponse:
"""Get an asset."""
asset = session.scalar(
select(AssetModel)
.where(AssetModel.uri == uri)
.options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks))
)

if asset is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found")

return AssetResponse.model_validate(asset, from_attributes=True)
16 changes: 16 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ export const UseAssetServiceGetAssetsKeyFn = (
useAssetServiceGetAssetsKey,
...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
];
export type AssetServiceGetAssetDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAsset>
>;
export type AssetServiceGetAssetQueryResult<
TData = AssetServiceGetAssetDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useAssetServiceGetAssetKey = "AssetServiceGetAsset";
export const UseAssetServiceGetAssetKeyFn = (
{
uri,
}: {
uri: string;
},
queryKey?: Array<unknown>,
) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ export const prefetchUseAssetServiceGetAssets = (
queryFn: () =>
AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const prefetchUseAssetServiceGetAsset = (
queryClient: QueryClient,
{
uri,
}: {
uri: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
queryFn: () => AssetService.getAsset({ uri }),
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,32 @@ export const useAssetServiceGetAssets = <
}) as TData,
...options,
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetAsset = <
TData = Common.AssetServiceGetAssetDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
uri,
}: {
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,32 @@ export const useAssetServiceGetAssetsSuspense = <
}) as TData,
...options,
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetAssetSuspense = <
TData = Common.AssetServiceGetAssetDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
uri,
}: {
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
28 changes: 28 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
NextRunAssetsResponse,
GetAssetsData,
GetAssetsResponse,
GetAssetData,
GetAssetResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
Expand Down Expand Up @@ -169,6 +171,32 @@ export class AssetService {
},
});
}

/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
public static getAsset(
data: GetAssetData,
): CancelablePromise<GetAssetResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/assets/{uri}",
path: {
uri: data.uri,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class DashboardService {
Expand Down
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,12 @@ export type GetAssetsData = {

export type GetAssetsResponse = AssetCollectionResponse;

export type GetAssetData = {
uri: string;
};

export type GetAssetResponse = AssetResponse;

export type HistoricalMetricsData = {
endDate: string;
startDate: string;
Expand Down Expand Up @@ -1418,6 +1424,33 @@ export type $OpenApiTs = {
};
};
};
"/public/assets/{uri}": {
get: {
req: GetAssetData;
res: {
/**
* Successful Response
*/
200: AssetResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/ui/dashboard/historical_metrics_data": {
get: {
req: HistoricalMetricsData;
Expand Down
42 changes: 42 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# under the License.
from __future__ import annotations

import urllib

import pytest

from airflow.models import DagModel
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.utils import timezone
from airflow.utils.session import provide_session

from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.db import clear_db_assets

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -229,3 +232,42 @@ def test_should_respect_page_size_limit_default(self, test_client):

assert response.status_code == 200
assert len(response.json()["assets"]) == 100


class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
"url",
[
urllib.parse.quote(
"s3://bucket/key/1", safe=""
), # api should cover raw as well as unquoted case like legacy
"s3://bucket/key/1",
],
)
@provide_session
def test_should_respond_200(self, test_client, url, session):
self.create_assets(num=1)
assert session.query(AssetModel).count() == 1
tz_datetime_format = self.default_time.replace("+00:00", "Z")
with assert_queries_count(6):
response = test_client.get(
f"/public/assets/{url}",
)
assert response.status_code == 200
assert response.json() == {
"id": 1,
"uri": "s3://bucket/key/1",
"extra": {"foo": "bar"},
"created_at": tz_datetime_format,
"updated_at": tz_datetime_format,
"consuming_dags": [],
"producing_tasks": [],
"aliases": [],
}

def test_should_respond_404(self, test_client):
response = test_client.get(
f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}",
)
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"

0 comments on commit c84d356

Please sign in to comment.