Skip to content

Commit

Permalink
AIP-84 Migrate post a connection to FastAPI API (apache#43396)
Browse files Browse the repository at this point in the history
* Migrate Create a Connection to FastAPI

* Remove additional duplicate comment

* Include password in connection  and move dashboard.py to serializers/ui/

* Fix test for password

* Include password field to response and redact it, run pre-commit after rebase

* Convert redact to field_validator and fix tests

* Pass field name into redact

* run pre-commit after rebase
  • Loading branch information
bugraoz93 authored Nov 5, 2024
1 parent c96b618 commit 1328d1a
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 7 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def patch_connection(
return connection_schema.dump(connection)


@mark_fastapi_migration_done
@security.requires_access_connection("POST")
@provide_session
@action_logging(
Expand Down
102 changes: 100 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,49 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
post:
tags:
- Connection
summary: Post Connection
description: Create connection entry.
operationId: post_connection
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionBody'
responses:
'201':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}:
get:
tags:
Expand Down Expand Up @@ -2515,6 +2558,55 @@ components:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ConnectionBody:
properties:
connection_id:
type: string
title: Connection Id
conn_type:
type: string
title: Conn Type
description:
anyOf:
- type: string
- type: 'null'
title: Description
host:
anyOf:
- type: string
- type: 'null'
title: Host
login:
anyOf:
- type: string
- type: 'null'
title: Login
schema:
anyOf:
- type: string
- type: 'null'
title: Schema
port:
anyOf:
- type: integer
- type: 'null'
title: Port
password:
anyOf:
- type: string
- type: 'null'
title: Password
extra:
anyOf:
- type: string
- type: 'null'
title: Extra
type: object
required:
- connection_id
- conn_type
title: ConnectionBody
description: Connection Serializer for requests body.
ConnectionCollectionResponse:
properties:
connections:
Expand Down Expand Up @@ -2564,6 +2656,11 @@ components:
- type: integer
- type: 'null'
title: Port
password:
anyOf:
- type: string
- type: 'null'
title: Password
extra:
anyOf:
- type: string
Expand All @@ -2578,6 +2675,7 @@ components:
- login
- schema
- port
- password
- extra
title: ConnectionResponse
description: Connection serializer for responses.
Expand Down Expand Up @@ -3545,7 +3643,7 @@ components:
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState'
$ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState'
type: object
required:
- dag_run_types
Expand Down Expand Up @@ -4224,7 +4322,7 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState:
airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState:
properties:
no_status:
type: integer
Expand Down
23 changes: 23 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.connections import (
ConnectionBody,
ConnectionCollectionResponse,
ConnectionResponse,
)
from airflow.models import Connection
from airflow.utils import helpers

connections_router = AirflowRouter(tags=["Connection"], prefix="/connections")

Expand Down Expand Up @@ -114,3 +116,24 @@ async def get_connections(
],
total_entries=total_entries,
)


@connections_router.post("/", status_code=201, responses=create_openapi_http_exception_doc([401, 403, 409]))
async def post_connection(
post_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
"""Create connection entry."""
try:
helpers.validate_key(post_body.connection_id, max_length=200)
except Exception as e:
raise HTTPException(400, f"{e}")

connection = session.scalar(select(Connection).filter_by(conn_id=post_body.connection_id))
if connection is not None:
raise HTTPException(409, f"Connection with connection_id: `{post_body.connection_id}` already exists")

connection = Connection(**post_body.model_dump(by_alias=True))
session.add(connection)

return ConnectionResponse.model_validate(connection, from_attributes=True)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/ui/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from airflow.api_fastapi.common.parameters import DateTimeQuery
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dashboard import HistoricalMetricDataResponse
from airflow.api_fastapi.core_api.serializers.ui.dashboard import HistoricalMetricDataResponse
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down
25 changes: 25 additions & 0 deletions airflow/api_fastapi/core_api/serializers/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import json

from pydantic import BaseModel, Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.utils.log.secrets_masker import redact


# Response Models
class ConnectionResponse(BaseModel):
"""Connection serializer for responses."""

Expand All @@ -34,8 +36,16 @@ class ConnectionResponse(BaseModel):
login: str | None
schema_: str | None = Field(alias="schema")
port: int | None
password: str | None
extra: str | None

@field_validator("password", mode="after")
@classmethod
def redact_password(cls, v: str | None, field_info: ValidationInfo) -> str | None:
if v is None:
return None
return redact(v, field_info.field_name)

@field_validator("extra", mode="before")
@classmethod
def redact_extra(cls, v: str | None) -> str | None:
Expand All @@ -55,3 +65,18 @@ class ConnectionCollectionResponse(BaseModel):

connections: list[ConnectionResponse]
total_entries: int


# Request Models
class ConnectionBody(BaseModel):
"""Connection Serializer for requests body."""

connection_id: str = Field(serialization_alias="conn_id")
conn_type: str
description: str | None = Field(default=None)
host: str | None = Field(default=None)
login: str | None = Field(default=None)
schema_: str | None = Field(None, alias="schema")
port: int | None = Field(default=None)
password: str | None = Field(default=None)
extra: str | None = Field(default=None)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) => [
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
} from "../requests/services.gen";
import {
BackfillPostBody,
ConnectionBody,
DAGPatchBody,
DAGRunPatchBody,
DagRunState,
Expand Down Expand Up @@ -1130,6 +1131,45 @@ export const useBackfillServiceCreateBackfill = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Connection
* Create connection entry.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionResponse Successful Response
* @throws ApiError
*/
export const useConnectionServicePostConnection = <
TData = Common.ConnectionServicePostConnectionMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: ConnectionBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
ConnectionService.postConnection({
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
Loading

0 comments on commit 1328d1a

Please sign in to comment.