Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP 84: Migrate GET one ASSET legacy API to fast API #43825

Merged
merged 27 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf5218c
AIP-84: Migrating GET Assets to fastAPI
amoghrajesh Nov 6, 2024
5a49280
matching response to legacy
amoghrajesh Nov 6, 2024
962572b
Adding unit tests - part 1
amoghrajesh Nov 8, 2024
428cb6c
Update airflow/api_fastapi/common/parameters.py
amoghrajesh Nov 8, 2024
a78d3cb
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
882d20c
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
25bb08e
Adding unit tests - part 2
amoghrajesh Nov 8, 2024
658479d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fa0cd23
fixing unit tests & updating parameter type
amoghrajesh Nov 8, 2024
6a4f3ce
AIP-84: Migrating GET Single Asset to fastAPI
amoghrajesh Nov 8, 2024
9eb3610
adding test cases
amoghrajesh Nov 8, 2024
dd791c2
review comments pierre
amoghrajesh Nov 8, 2024
06fa0a7
fixing last commit
amoghrajesh Nov 8, 2024
3bd803b
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fc29d7d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
7a97220
fixing unit tests
amoghrajesh Nov 9, 2024
563c5fb
Merge branch 'AIP84-get-asset-to-fastapi' into AIP84-get-one-asset-to…
amoghrajesh Nov 11, 2024
0df9ebd
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 12, 2024
5fb8bc1
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 12, 2024
02458df
Merge branch 'AIP84-get-asset-to-fastapi' into AIP84-get-one-asset-to…
amoghrajesh Nov 12, 2024
54d0b9e
Merge branch 'main' into AIP84-get-one-asset-to-fastapi
amoghrajesh Nov 13, 2024
e2b4651
adding loading options
amoghrajesh Nov 13, 2024
ffcbcff
Merge branch 'main' into AIP84-get-one-asset-to-fastapi
amoghrajesh Nov 13, 2024
cb3864c
adding loading options
amoghrajesh Nov 13, 2024
cd916a7
adding assert_queries_count
amoghrajesh Nov 13, 2024
f58f3ec
Merge branch 'main' into AIP84-get-one-asset-to-fastapi
pierrejeambrun Nov 13, 2024
215ff20
Merge branch 'main' into AIP84-get-one-asset-to-fastapi
amoghrajesh Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
RESOURCE_EVENT_PREFIX = "asset"


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
Expand Down
45 changes: 45 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,51 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
- Asset
summary: Get Asset
description: Get an asset.
operationId: get_asset
parameters:
- name: uri
in: path
required: true
schema:
type: string
title: Uri
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AssetResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down
32 changes: 28 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

from typing import Annotated

from fastapi import Depends
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload, subqueryload

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -63,9 +63,33 @@ def get_assets(
limit=limit,
session=session,
)

assets = session.scalars(assets_select).all()
assets = session.scalars(
assets_select.options(
subqueryload(AssetModel.consuming_dags), subqueryload(AssetModel.producing_tasks)
)
).all()
return AssetCollectionResponse(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets],
total_entries=total_entries,
)


@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
)
def get_asset(
uri: str,
session: Annotated[Session, Depends(get_session)],
) -> AssetResponse:
"""Get an asset."""
asset = session.scalar(
select(AssetModel)
.where(AssetModel.uri == uri)
.options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks))
)

if asset is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found")

return AssetResponse.model_validate(asset, from_attributes=True)
16 changes: 16 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ export const UseAssetServiceGetAssetsKeyFn = (
useAssetServiceGetAssetsKey,
...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
];
export type AssetServiceGetAssetDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAsset>
>;
export type AssetServiceGetAssetQueryResult<
TData = AssetServiceGetAssetDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useAssetServiceGetAssetKey = "AssetServiceGetAsset";
export const UseAssetServiceGetAssetKeyFn = (
{
uri,
}: {
uri: string;
},
queryKey?: Array<unknown>,
) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ export const prefetchUseAssetServiceGetAssets = (
queryFn: () =>
AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const prefetchUseAssetServiceGetAsset = (
queryClient: QueryClient,
{
uri,
}: {
uri: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
queryFn: () => AssetService.getAsset({ uri }),
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,32 @@ export const useAssetServiceGetAssets = <
}) as TData,
...options,
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetAsset = <
TData = Common.AssetServiceGetAssetDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
uri,
}: {
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,32 @@ export const useAssetServiceGetAssetsSuspense = <
}) as TData,
...options,
});
/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceGetAssetSuspense = <
TData = Common.AssetServiceGetAssetDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
uri,
}: {
uri: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
28 changes: 28 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
NextRunAssetsResponse,
GetAssetsData,
GetAssetsResponse,
GetAssetData,
GetAssetResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
Expand Down Expand Up @@ -169,6 +171,32 @@ export class AssetService {
},
});
}

/**
* Get Asset
* Get an asset.
* @param data The data for the request.
* @param data.uri
* @returns AssetResponse Successful Response
* @throws ApiError
*/
public static getAsset(
data: GetAssetData,
): CancelablePromise<GetAssetResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/assets/{uri}",
path: {
uri: data.uri,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class DashboardService {
Expand Down
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,12 @@ export type GetAssetsData = {

export type GetAssetsResponse = AssetCollectionResponse;

export type GetAssetData = {
uri: string;
};

export type GetAssetResponse = AssetResponse;

export type HistoricalMetricsData = {
endDate: string;
startDate: string;
Expand Down Expand Up @@ -1417,6 +1423,33 @@ export type $OpenApiTs = {
};
};
};
"/public/assets/{uri}": {
get: {
req: GetAssetData;
res: {
/**
* Successful Response
*/
200: AssetResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/ui/dashboard/historical_metrics_data": {
get: {
req: HistoricalMetricsData;
Expand Down
42 changes: 42 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# under the License.
from __future__ import annotations

import urllib

import pytest

from airflow.models import DagModel
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.utils import timezone
from airflow.utils.session import provide_session

from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.db import clear_db_assets

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -229,3 +232,42 @@ def test_should_respect_page_size_limit_default(self, test_client):

assert response.status_code == 200
assert len(response.json()["assets"]) == 100


class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
"url",
[
urllib.parse.quote(
"s3://bucket/key/1", safe=""
), # api should cover raw as well as unquoted case like legacy
"s3://bucket/key/1",
],
)
@provide_session
def test_should_respond_200(self, test_client, url, session):
self.create_assets(num=1)
assert session.query(AssetModel).count() == 1
tz_datetime_format = self.default_time.replace("+00:00", "Z")
with assert_queries_count(6):
response = test_client.get(
f"/public/assets/{url}",
)
assert response.status_code == 200
assert response.json() == {
"id": 1,
"uri": "s3://bucket/key/1",
"extra": {"foo": "bar"},
"created_at": tz_datetime_format,
"updated_at": tz_datetime_format,
"consuming_dags": [],
"producing_tasks": [],
"aliases": [],
}

def test_should_respond_404(self, test_client):
response = test_client.get(
f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}",
)
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found"