diff --git a/Cargo.lock b/Cargo.lock index e651351f67..0919fbc566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6937,6 +6937,7 @@ dependencies = [ "restate-bifrost", "restate-core", "restate-errors", + "restate-ingestion-client", "restate-metadata-providers", "restate-metadata-store", "restate-service-client", @@ -11158,8 +11159,8 @@ dependencies = [ "anyhow", "reqwest", "restate-admin", - "restate-bifrost", "restate-core", + "restate-ingestion-client", "restate-metadata-server", "restate-service-client", "restate-service-protocol", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index dd78e3f914..ee89ad6252 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -28,6 +28,7 @@ restate-admin-rest-model = { workspace = true, features = ["schema"] } restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] } restate-core = { workspace = true } restate-errors = { workspace = true } +restate-ingestion-client = { workspace = true } restate-metadata-store = { workspace = true } restate-metadata-providers = { workspace = true } restate-service-client = { workspace = true } diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index ba6a750958..02e70dbd0a 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -58,8 +58,8 @@ use serde::Deserialize; from_type = "MetaApiError", ) )] -pub async fn create_deployment( - State(state): State>, +pub async fn create_deployment( + State(state): State>, Extension(version): Extension, #[request_body(required = true)] Json(payload): Json, ) -> Result @@ -188,8 +188,8 @@ where schema = "std::string::String" )) )] -pub async fn get_deployment( - State(state): State>, +pub async fn get_deployment( + State(state): State>, Path(deployment_id): Path, ) -> Result, MetaApiError> where @@ -210,8 +210,8 @@ where operation_id = "list_deployments", tags = "deployment" )] -pub async fn list_deployments( - State(state): State>, +pub async fn list_deployments( + State(state): State>, ) -> Json where Metadata: MetadataService, @@ -267,8 +267,8 @@ pub struct DeleteDeploymentParams { from_type = "MetaApiError", ) )] -pub async fn delete_deployment( - State(state): State>, +pub async fn delete_deployment( + State(state): State>, Path(deployment_id): Path, Query(DeleteDeploymentParams { force }): Query, ) -> Result @@ -302,8 +302,8 @@ where schema = "std::string::String" )) )] -pub async fn update_deployment( - State(state): State>, +pub async fn update_deployment( + State(state): State>, Extension(version): Extension, method: Method, Path(deployment_id): Path, diff --git a/crates/admin/src/rest_api/handlers.rs b/crates/admin/src/rest_api/handlers.rs index 7624d95934..9750d095cf 100644 --- a/crates/admin/src/rest_api/handlers.rs +++ b/crates/admin/src/rest_api/handlers.rs @@ -30,8 +30,8 @@ use restate_types::schema::service::HandlerMetadata; schema = "std::string::String" )) )] -pub async fn list_service_handlers( - State(state): State>, +pub async fn list_service_handlers( + State(state): State>, Path(service_name): Path, ) -> Result, MetaApiError> where @@ -62,8 +62,8 @@ where ) ) )] -pub async fn get_service_handler( - State(state): State>, +pub async fn get_service_handler( + State(state): State>, Path((service_name, handler_name)): Path<(String, String)>, ) -> Result, MetaApiError> where diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index 42446a69c7..43518e7d62 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -8,121 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::error::*; -use crate::generate_meta_api_error; -use crate::rest_api::create_envelope_header; -use crate::state::AdminServiceState; use axum::Json; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use okapi_operation::*; +use serde::Deserialize; + use restate_admin_rest_model::invocations::RestartAsNewInvocationResponse; -use restate_types::identifiers::{ - DeploymentId, InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey, -}; +use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId}; use restate_types::invocation::client::{ self, CancelInvocationResponse, InvocationClient, KillInvocationResponse, PauseInvocationResponse, PurgeInvocationResponse, ResumeInvocationResponse, }; -use restate_types::invocation::{InvocationTermination, PurgeInvocationRequest, TerminationFlavor}; use restate_types::journal_v2::EntryIndex; -use restate_wal_protocol::{Command, Envelope}; -use serde::Deserialize; -use std::sync::Arc; -use tracing::warn; -#[derive(Debug, Default, Deserialize, JsonSchema)] -pub enum DeletionMode { - #[default] - #[serde(alias = "cancel")] - Cancel, - #[serde(alias = "kill")] - Kill, - #[serde(alias = "purge")] - Purge, -} -#[derive(Debug, Default, Deserialize, JsonSchema)] -pub struct DeleteInvocationParams { - pub mode: Option, -} - -/// Terminate an invocation -#[openapi( - summary = "Delete an invocation", - deprecated = true, - description = "Use kill_invocation/cancel_invocation/purge_invocation instead.", - operation_id = "delete_invocation", - tags = "invocation", - parameters( - path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - ), - query( - name = "mode", - description = "If cancel, it will gracefully terminate the invocation. \ - If kill, it will terminate the invocation with a hard stop. \ - If purge, it will only cleanup the response for completed invocations, and leave unaffected an in-flight invocation.", - required = false, - style = "simple", - allow_empty_value = false, - schema = "DeletionMode", - ) - ), - responses( - ignore_return_type = true, - response( - status = "202", - description = "Accepted", - content = "okapi_operation::Empty", - ), - from_type = "MetaApiError", - ) -)] -pub async fn delete_invocation( - State(state): State>, - Path(invocation_id): Path, - Query(DeleteInvocationParams { mode }): Query, -) -> Result { - let invocation_id = invocation_id - .parse::() - .map_err(|e| MetaApiError::InvalidField("invocation_id", e.to_string()))?; - - let cmd = match mode.unwrap_or_default() { - DeletionMode::Cancel => Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Cancel, - response_sink: None, - }), - DeletionMode::Kill => Command::TerminateInvocation(InvocationTermination { - invocation_id, - flavor: TerminationFlavor::Kill, - response_sink: None, - }), - DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: None, - }), - }; - - let partition_key = invocation_id.partition_key(); - - let result = restate_bifrost::append_to_bifrost( - &state.bifrost, - Arc::new(Envelope::new(create_envelope_header(partition_key), cmd)), - ) - .await; - - if let Err(err) = result { - warn!("Could not append invocation termination command to Bifrost: {err}"); - Err(MetaApiError::Internal( - "Failed sending invocation termination to the cluster.".to_owned(), - )) - } else { - Ok(StatusCode::ACCEPTED) - } -} +use super::error::*; +use crate::generate_meta_api_error; +use crate::state::AdminServiceState; generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]); @@ -139,8 +41,8 @@ generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, Invocati schema = "std::string::String" )) )] -pub async fn kill_invocation( - State(state): State>, +pub async fn kill_invocation( + State(state): State>, Path(invocation_id): Path, ) -> Result<(), KillInvocationError> where @@ -199,8 +101,8 @@ generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, Invoca from_type = "CancelInvocationError", ) )] -pub async fn cancel_invocation( - State(state): State>, +pub async fn cancel_invocation( + State(state): State>, Path(invocation_id): Path, ) -> Result where @@ -241,8 +143,8 @@ generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, Invocat schema = "std::string::String" )) )] -pub async fn purge_invocation( - State(state): State>, +pub async fn purge_invocation( + State(state): State>, Path(invocation_id): Path, ) -> Result<(), PurgeInvocationError> where @@ -284,8 +186,8 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation schema = "std::string::String" )) )] -pub async fn purge_journal( - State(state): State>, +pub async fn purge_journal( + State(state): State>, Path(invocation_id): Path, ) -> Result<(), PurgeJournalError> where @@ -398,8 +300,8 @@ generate_meta_api_error!(RestartInvocationError: [ ), ) )] -pub async fn restart_as_new_invocation( - State(state): State>, +pub async fn restart_as_new_invocation( + State(state): State>, Path(invocation_id): Path, Query(RestartAsNewInvocationQueryParams { from, deployment }): Query< RestartAsNewInvocationQueryParams, @@ -510,8 +412,8 @@ generate_meta_api_error!(ResumeInvocationError: [ ) ) )] -pub async fn resume_invocation( - State(state): State>, +pub async fn resume_invocation( + State(state): State>, Path(invocation_id): Path, Query(ResumeInvocationQueryParams { deployment }): Query, ) -> Result<(), ResumeInvocationError> @@ -596,8 +498,8 @@ generate_meta_api_error!(PauseInvocationError: [ from_type = "PauseInvocationError", ) )] -pub async fn pause_invocation( - State(state): State>, +pub async fn pause_invocation( + State(state): State>, Path(invocation_id): Path, ) -> Result where diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 3dda5102e6..191534723f 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -23,6 +23,7 @@ mod version; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::okapi::openapi3::{ExternalDocs, Tag}; use okapi_operation::*; +use restate_core::network::TransportConnect; use restate_types::identifiers::PartitionKey; use restate_types::invocation::client::InvocationClient; use restate_types::schema::registry::{DiscoveryClient, MetadataService, TelemetryClient}; @@ -32,14 +33,15 @@ use crate::state::AdminServiceState; pub use version::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION}; -pub fn create_router( - state: AdminServiceState, +pub fn create_router( + state: AdminServiceState, ) -> axum::Router<()> where Metadata: MetadataService + Send + Sync + Clone + 'static, Discovery: DiscoveryClient + Send + Sync + Clone + 'static, Telemetry: TelemetryClient + Send + Sync + Clone + 'static, Invocations: InvocationClient + Send + Sync + Clone + 'static, + Transport: TransportConnect, { let mut router = axum_integration::Router::new() .route( @@ -91,10 +93,6 @@ where "/services/{service}/handlers/{handler}", get(openapi_handler!(handlers::get_service_handler)), ) - .route( - "/invocations/{invocation_id}", - delete(openapi_handler!(invocations::delete_invocation)), - ) .route( "/invocations/{invocation_id}/kill", patch(openapi_handler!(invocations::kill_invocation)), diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index 93664e7b7f..a089915c16 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; use tracing::{debug, warn}; use axum::Json; @@ -20,6 +19,7 @@ use okapi_operation::*; use restate_admin_rest_model::services::ListServicesResponse; use restate_admin_rest_model::services::*; use restate_core::TaskCenter; +use restate_core::network::TransportConnect; use restate_errors::warn_it; use restate_types::config::Configuration; use restate_types::identifiers::{ServiceId, WithPartitionKey}; @@ -40,8 +40,8 @@ use crate::state::AdminServiceState; operation_id = "list_services", tags = "service" )] -pub async fn list_services( - State(state): State>, +pub async fn list_services( + State(state): State>, ) -> Result, MetaApiError> where Metadata: MetadataService, @@ -63,8 +63,8 @@ where schema = "std::string::String" )) )] -pub async fn get_service( - State(state): State>, +pub async fn get_service( + State(state): State>, Path(service_name): Path, ) -> Result, MetaApiError> where @@ -98,8 +98,8 @@ where from_type = "MetaApiError", ) )] -pub async fn get_service_openapi( - State(state): State>, +pub async fn get_service_openapi( + State(state): State>, Path(service_name): Path, ) -> Result, MetaApiError> where @@ -131,8 +131,8 @@ where schema = "std::string::String" )) )] -pub async fn modify_service( - State(state): State>, +pub async fn modify_service( + State(state): State>, Path(service_name): Path, #[request_body(required = true)] Json(ModifyServiceRequest { public, @@ -196,8 +196,8 @@ where from_type = "MetaApiError", ) )] -pub async fn modify_service_state( - State(state): State>, +pub async fn modify_service_state( + State(state): State>, Path(service_name): Path, #[request_body(required = true)] Json(ModifyServiceStateRequest { version, @@ -207,6 +207,7 @@ pub async fn modify_service_state( ) -> Result where Metadata: MetadataService, + Transport: TransportConnect, { if let Some(svc) = state.schema_registry.get_service(&service_name) { if !svc.ty.has_state() { @@ -236,17 +237,24 @@ where state: new_state, }; - let result = restate_bifrost::append_to_bifrost( - &state.bifrost, - Arc::new(Envelope::new( - create_envelope_header(partition_key), - Command::PatchState(patch_state), - )), - ) - .await; + let envelope = Envelope::new( + create_envelope_header(partition_key), + Command::PatchState(patch_state), + ); + + let result = state + .ingestion_client + .ingest(partition_key, envelope) + .await + .map_err(|err| { + warn!("Could not ingest state patching command: {err}"); + MetaApiError::Internal( + "Failed sending state patching command to the cluster.".to_owned(), + ) + })?; - if let Err(err) = result { - warn!("Could not append state patching command to Bifrost: {err}"); + if let Err(err) = result.await { + warn!("Could not ingest state patching command: {err}"); Err(MetaApiError::Internal( "Failed sending state patching command to the cluster.".to_owned(), )) diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index d6b5dac634..0159fa84d3 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -42,8 +42,8 @@ use restate_types::schema::registry::MetadataService; from_type = "MetaApiError", ) )] -pub async fn create_subscription( - State(state): State>, +pub async fn create_subscription( + State(state): State>, #[request_body(required = true)] Json(payload): Json, ) -> Result where @@ -77,8 +77,8 @@ where schema = "std::string::String" )) )] -pub async fn get_subscription( - State(state): State>, +pub async fn get_subscription( + State(state): State>, Path(subscription_id): Path, ) -> Result, MetaApiError> where @@ -117,8 +117,8 @@ where ) ) )] -pub async fn list_subscriptions( - State(state): State>, +pub async fn list_subscriptions( + State(state): State>, Query(ListSubscriptionsParams { sink, source }): Query, ) -> Json where @@ -168,8 +168,8 @@ where from_type = "MetaApiError", ) )] -pub async fn delete_subscription( - State(state): State>, +pub async fn delete_subscription( + State(state): State>, Path(subscription_id): Path, ) -> Result where diff --git a/crates/admin/src/rest_api/version.rs b/crates/admin/src/rest_api/version.rs index 1857cb5d95..020be98c0f 100644 --- a/crates/admin/src/rest_api/version.rs +++ b/crates/admin/src/rest_api/version.rs @@ -14,8 +14,12 @@ use restate_admin_rest_model::version::{AdminApiVersion, VersionInformation}; use restate_core::TaskCenter; use restate_types::config::Configuration; +// Changes: +// - Dropped support of V2 admin api in version v1.6. Changes include: +// - dropped `DELETE "/invocations/{invocation_id}"` endpoint + /// Min/max supported admin api versions by the server -pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2; +pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V3; pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V3; /// Version information endpoint diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 3cbc4bb2e4..905dc6040a 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -12,14 +12,15 @@ use std::time::Duration; use axum::error_handling::HandleErrorLayer; use http::{Request, Response, StatusCode}; +use restate_ingestion_client::IngestionClient; +use restate_wal_protocol::Envelope; use tower::ServiceBuilder; use tower_http::classify::ServerErrorsFailureClass; use tower_http::trace::TraceLayer; use tracing::{Span, debug, info, info_span}; use restate_admin_rest_model::version::AdminApiVersion; -use restate_bifrost::Bifrost; -use restate_core::network::net_util; +use restate_core::network::{TransportConnect, net_util}; use restate_core::{MetadataWriter, TaskCenter}; use restate_service_client::HttpClient; use restate_service_protocol::discovery::ServiceDiscovery; @@ -39,9 +40,9 @@ use crate::{rest_api, state}; #[error("could not create the service client: {0}")] pub struct BuildError(#[from] restate_service_client::BuildError); -pub struct AdminService { +pub struct AdminService { listeners: Listeners, - bifrost: Bifrost, + ingestion_client: IngestionClient, schema_registry: SchemaRegistry, invocation_client: Invocations, #[cfg(feature = "storage-query")] @@ -50,21 +51,23 @@ pub struct AdminService { metadata_writer: MetadataWriter, } -impl AdminService +impl + AdminService where Invocations: InvocationClient + Send + Sync + Clone + 'static, + Transport: TransportConnect, { pub fn new( listeners: Listeners, metadata_writer: MetadataWriter, - bifrost: Bifrost, + ingestion_client: IngestionClient, invocation_client: Invocations, service_discovery: ServiceDiscovery, telemetry_http_client: Option, ) -> Self { Self { listeners, - bifrost, + ingestion_client, #[cfg(feature = "metadata-api")] metadata_writer: metadata_writer.clone(), schema_registry: SchemaRegistry::new( @@ -98,7 +101,7 @@ where let rest_state = state::AdminServiceState::new( self.schema_registry, self.invocation_client, - self.bifrost, + self.ingestion_client, ); let router = axum::Router::new(); diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index a08cfedbbc..4a8cbbffaa 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -8,28 +8,32 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_bifrost::Bifrost; +use restate_core::network::TransportConnect; +use restate_ingestion_client::IngestionClient; use restate_types::schema::registry::SchemaRegistry; +use restate_wal_protocol::Envelope; #[derive(Clone, derive_builder::Builder)] -pub struct AdminServiceState { +pub struct AdminServiceState { pub schema_registry: SchemaRegistry, pub invocation_client: Invocations, - pub bifrost: Bifrost, + pub ingestion_client: IngestionClient, } -impl - AdminServiceState +impl + AdminServiceState +where + Transport: TransportConnect, { pub fn new( schema_registry: SchemaRegistry, invocation_client: Invocations, - bifrost: Bifrost, + ingestion_client: IngestionClient, ) -> Self { Self { schema_registry, invocation_client, - bifrost, + ingestion_client, } } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 6bdb125cd1..1257591a49 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -283,7 +283,7 @@ impl Node { partition_store_manager.clone(), networking.clone(), bifrost_svc.handle(), - ingestion_client, + ingestion_client.clone(), metadata_manager.writer(), ) .await?, @@ -314,6 +314,7 @@ impl Node { AdminRole::create( tc.health().admin_status(), bifrost.clone(), + ingestion_client, updateable_config.clone(), PartitionRouting::new(replica_set_states.clone(), tc), metadata.updateable_partition_table(), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 07e47f29b9..a88171441d 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -23,6 +23,7 @@ use restate_core::network::TransportConnect; use restate_core::partitions::PartitionRouting; use restate_core::worker_api::PartitionProcessorInvocationClient; use restate_core::{Metadata, MetadataWriter, TaskCenter, TaskKind}; +use restate_ingestion_client::IngestionClient; use restate_partition_store::PartitionStoreManager; use restate_service_client::{AssumeRoleCacheMode, HttpClient, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; @@ -42,6 +43,7 @@ use restate_types::partition_table::PartitionTable; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::protobuf::common::AdminStatus; use restate_types::retries::RetryPolicy; +use restate_wal_protocol::Envelope; #[derive(Debug, thiserror::Error, CodedError)] pub enum AdminRoleBuildError { @@ -67,6 +69,7 @@ pub struct AdminRole { ServiceDiscovery, TelemetryClient, PartitionProcessorInvocationClient, + T, >, storage_accounting_task: Option, } @@ -76,6 +79,7 @@ impl AdminRole { pub async fn create( health_status: HealthStatus, bifrost: Bifrost, + ingestion_client: IngestionClient, updateable_config: Live, partition_routing: PartitionRouting, partition_table: Live, @@ -127,7 +131,7 @@ impl AdminRole { let admin = AdminService::new( listeners, metadata_writer.clone(), - bifrost.clone(), + ingestion_client, PartitionProcessorInvocationClient::new( networking.clone(), partition_table, diff --git a/tools/xtask/Cargo.toml b/tools/xtask/Cargo.toml index 6779167cea..9b68120987 100644 --- a/tools/xtask/Cargo.toml +++ b/tools/xtask/Cargo.toml @@ -9,8 +9,8 @@ publish = false [dependencies] restate-admin = { workspace = true, features = ["options_schema"] } -restate-bifrost = { workspace = true, features = ["test-util"] } restate-core = { workspace = true, features = ["test-util"] } +restate-ingestion-client = { workspace = true } restate-metadata-server = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"]} diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index e0439c2144..5565ff7e39 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -10,15 +10,18 @@ use std::future::pending; use std::io::Write; +use std::num::NonZeroUsize; use std::sync::Arc; use std::{env, io}; use anyhow::bail; use reqwest::header::ACCEPT; +use restate_core::partitions::PartitionRouting; +use restate_ingestion_client::IngestionClient; +use restate_types::partitions::state::PartitionReplicaSetStates; use schemars::r#gen::SchemaSettings; use restate_admin::service::AdminService; -use restate_bifrost::Bifrost; use restate_core::{TaskCenter, TaskCenterBuilder, TestCoreEnv}; use restate_core::{TaskCenterFutureExt, TaskKind}; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; @@ -214,14 +217,21 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; + + let ingress_client = IngestionClient::new( + node_env.networking, + node_env.metadata.updateable_partition_table(), + PartitionRouting::new(PartitionReplicaSetStates::default(), TaskCenter::current()), + NonZeroUsize::new(1000).unwrap(), + None, + ); let socket_dir = tempfile::tempdir()?; let socket_path = socket_dir.path().join("admin.sock"); let admin_service = AdminService::new( Listeners::new_unix_listener(socket_path.clone())?, node_env.metadata_writer.clone(), - bifrost, + ingress_client, Mock, ServiceDiscovery::new( RetryPolicy::default(),