Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ restate-admin-rest-model = { workspace = true, features = ["schema"] }
restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingestion-client = { workspace = true }
restate-metadata-store = { workspace = true }
restate-metadata-providers = { workspace = true }
restate-service-client = { workspace = true }
Expand Down
20 changes: 10 additions & 10 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use serde::Deserialize;
from_type = "MetaApiError",
)
)]
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Extension(version): Extension<AdminApiVersion>,
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
) -> Result<impl IntoResponse, MetaApiError>
Expand Down Expand Up @@ -188,8 +188,8 @@ where
schema = "std::string::String"
))
)]
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(deployment_id): Path<DeploymentId>,
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError>
where
Expand All @@ -210,8 +210,8 @@ where
operation_id = "list_deployments",
tags = "deployment"
)]
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
) -> Json<ListDeploymentsResponse>
where
Metadata: MetadataService,
Expand Down Expand Up @@ -267,8 +267,8 @@ pub struct DeleteDeploymentParams {
from_type = "MetaApiError",
)
)]
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(deployment_id): Path<DeploymentId>,
Query(DeleteDeploymentParams { force }): Query<DeleteDeploymentParams>,
) -> Result<StatusCode, MetaApiError>
Expand Down Expand Up @@ -302,8 +302,8 @@ where
schema = "std::string::String"
))
)]
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Extension(version): Extension<AdminApiVersion>,
method: Method,
Path(deployment_id): Path<DeploymentId>,
Expand Down
8 changes: 4 additions & 4 deletions crates/admin/src/rest_api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use restate_types::schema::service::HandlerMetadata;
schema = "std::string::String"
))
)]
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(service_name): Path<String>,
) -> Result<Json<ListServiceHandlersResponse>, MetaApiError>
where
Expand Down Expand Up @@ -62,8 +62,8 @@ where
)
)
)]
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path((service_name, handler_name)): Path<(String, String)>,
) -> Result<Json<HandlerMetadata>, MetaApiError>
where
Expand Down
138 changes: 20 additions & 118 deletions crates/admin/src/rest_api/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,121 +8,23 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::error::*;
use crate::generate_meta_api_error;
use crate::rest_api::create_envelope_header;
use crate::state::AdminServiceState;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use okapi_operation::*;
use serde::Deserialize;

use restate_admin_rest_model::invocations::RestartAsNewInvocationResponse;
use restate_types::identifiers::{
DeploymentId, InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey,
};
use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId};
use restate_types::invocation::client::{
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
PauseInvocationResponse, PurgeInvocationResponse, ResumeInvocationResponse,
};
use restate_types::invocation::{InvocationTermination, PurgeInvocationRequest, TerminationFlavor};
use restate_types::journal_v2::EntryIndex;
use restate_wal_protocol::{Command, Envelope};
use serde::Deserialize;
use std::sync::Arc;
use tracing::warn;

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub enum DeletionMode {
#[default]
#[serde(alias = "cancel")]
Cancel,
#[serde(alias = "kill")]
Kill,
#[serde(alias = "purge")]
Purge,
}
#[derive(Debug, Default, Deserialize, JsonSchema)]
pub struct DeleteInvocationParams {
pub mode: Option<DeletionMode>,
}

/// Terminate an invocation
#[openapi(
summary = "Delete an invocation",
deprecated = true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API was deprecated in 1.4.0. If we remove it with 1.6.0, then we certainly need to add a release note to make people aware. Additionally, we need to check

and
/// Version information endpoint
whether we need to remove support for AdminApiVersion::V2 because we removed the old delete invocation endpoint. We should then also update
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;
accordingly if a bump is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @slinkydeveloper for double checking whether we can remove this API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually discussed this with him before I remove it. I was confused because the was 2 endpoints to delete an invocation. So he confirmed that it's no longer used (same for the UI).

Regarding the version, thank you for pointing this out. I think we should indeed set the MIN_ADMIN_API_VERSION to V3

description = "Use kill_invocation/cancel_invocation/purge_invocation instead.",
operation_id = "delete_invocation",
tags = "invocation",
parameters(
path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
),
query(
name = "mode",
description = "If cancel, it will gracefully terminate the invocation. \
If kill, it will terminate the invocation with a hard stop. \
If purge, it will only cleanup the response for completed invocations, and leave unaffected an in-flight invocation.",
required = false,
style = "simple",
allow_empty_value = false,
schema = "DeletionMode",
)
),
responses(
ignore_return_type = true,
response(
status = "202",
description = "Accepted",
content = "okapi_operation::Empty",
),
from_type = "MetaApiError",
)
)]
pub async fn delete_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
Path(invocation_id): Path<String>,
Query(DeleteInvocationParams { mode }): Query<DeleteInvocationParams>,
) -> Result<StatusCode, MetaApiError> {
let invocation_id = invocation_id
.parse::<InvocationId>()
.map_err(|e| MetaApiError::InvalidField("invocation_id", e.to_string()))?;

let cmd = match mode.unwrap_or_default() {
DeletionMode::Cancel => Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::Cancel,
response_sink: None,
}),
DeletionMode::Kill => Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::Kill,
response_sink: None,
}),
DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
};

let partition_key = invocation_id.partition_key();

let result = restate_bifrost::append_to_bifrost(
&state.bifrost,
Arc::new(Envelope::new(create_envelope_header(partition_key), cmd)),
)
.await;

if let Err(err) = result {
warn!("Could not append invocation termination command to Bifrost: {err}");
Err(MetaApiError::Internal(
"Failed sending invocation termination to the cluster.".to_owned(),
))
} else {
Ok(StatusCode::ACCEPTED)
}
}
use super::error::*;
use crate::generate_meta_api_error;
use crate::state::AdminServiceState;

generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]);

Expand All @@ -139,8 +41,8 @@ generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, Invocati
schema = "std::string::String"
))
)]
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), KillInvocationError>
where
Expand Down Expand Up @@ -199,8 +101,8 @@ generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, Invoca
from_type = "CancelInvocationError",
)
)]
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<StatusCode, CancelInvocationError>
where
Expand Down Expand Up @@ -241,8 +143,8 @@ generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, Invocat
schema = "std::string::String"
))
)]
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), PurgeInvocationError>
where
Expand Down Expand Up @@ -284,8 +186,8 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation
schema = "std::string::String"
))
)]
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), PurgeJournalError>
where
Expand Down Expand Up @@ -398,8 +300,8 @@ generate_meta_api_error!(RestartInvocationError: [
),
)
)]
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
Query(RestartAsNewInvocationQueryParams { from, deployment }): Query<
RestartAsNewInvocationQueryParams,
Expand Down Expand Up @@ -510,8 +412,8 @@ generate_meta_api_error!(ResumeInvocationError: [
)
)
)]
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
Query(ResumeInvocationQueryParams { deployment }): Query<ResumeInvocationQueryParams>,
) -> Result<(), ResumeInvocationError>
Expand Down Expand Up @@ -596,8 +498,8 @@ generate_meta_api_error!(PauseInvocationError: [
from_type = "PauseInvocationError",
)
)]
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<StatusCode, PauseInvocationError>
where
Expand Down
10 changes: 4 additions & 6 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod version;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::okapi::openapi3::{ExternalDocs, Tag};
use okapi_operation::*;
use restate_core::network::TransportConnect;
use restate_types::identifiers::PartitionKey;
use restate_types::invocation::client::InvocationClient;
use restate_types::schema::registry::{DiscoveryClient, MetadataService, TelemetryClient};
Expand All @@ -32,14 +33,15 @@ use crate::state::AdminServiceState;

pub use version::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION};

pub fn create_router<Metadata, Discovery, Telemetry, Invocations>(
state: AdminServiceState<Metadata, Discovery, Telemetry, Invocations>,
pub fn create_router<Metadata, Discovery, Telemetry, Invocations, Transport>(
state: AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>,
) -> axum::Router<()>
where
Metadata: MetadataService + Send + Sync + Clone + 'static,
Discovery: DiscoveryClient + Send + Sync + Clone + 'static,
Telemetry: TelemetryClient + Send + Sync + Clone + 'static,
Invocations: InvocationClient + Send + Sync + Clone + 'static,
Transport: TransportConnect,
{
let mut router = axum_integration::Router::new()
.route(
Expand Down Expand Up @@ -91,10 +93,6 @@ where
"/services/{service}/handlers/{handler}",
get(openapi_handler!(handlers::get_service_handler)),
)
.route(
"/invocations/{invocation_id}",
delete(openapi_handler!(invocations::delete_invocation)),
)
.route(
"/invocations/{invocation_id}/kill",
patch(openapi_handler!(invocations::kill_invocation)),
Expand Down
Loading
Loading