diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 55f458ce5c..6691714ec3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,6 +125,24 @@ jobs: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} serviceImage: "ghcr.io/restatedev/test-services-java:main" + sdk-java-journal-table-v2: + name: Run SDK-Java integration tests with Journal Table v2 + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + secrets: inherit + needs: docker + uses: restatedev/sdk-java/.github/workflows/integration.yaml@main + with: + restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + serviceImage: "ghcr.io/restatedev/test-services-java:main" + testArtifactOutput: sdk-java-integration-test-journal-table-v2 + envVars: | + RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true + sdk-python: name: Run SDK-Python integration tests permissions: @@ -226,6 +244,59 @@ jobs: envVars: | RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true + e2e-enable-journal-table-v2: + name: Run E2E tests with Journal Table V2 feature + runs-on: warp-ubuntu-latest-x64-4x + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + needs: docker + steps: + - name: Set up Docker containerd snapshotter + uses: crazy-max/ghaction-setup-docker@v3 + with: + set-host: true + daemon-config: | + { + "features": { + "containerd-snapshotter": true + } + } + + ### Download the Restate container image, if needed + - name: Download restate snapshot from in-progress workflow + uses: actions/download-artifact@v4 + with: + name: restate.tar + - name: Install restate snapshot + run: | + output=$(docker load --input restate.tar | head -n 1) + docker tag "${output#*: }" "localhost/restatedev/restate-commit-download:latest" + docker image ls -a + + ### Run e2e tests + - name: Run E2E tests + uses: restatedev/e2e@main + with: + testArtifactOutput: e2e-journal-table-v2-test-report + restateContainerImage: localhost/restatedev/restate-commit-download:latest + # Needed for backward compatibility tests 1.6 -> 1.5 + envVars: | + RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true + RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true + # Why these tests are disabled? + # In restate 1.5 the invoker storage reader will not handle correctly the case where there's no pinned deployment yet + # and the journal table used is v2. This doesn't show in the logs, but will simply hang badly the invocation task loop! + # These tests trigger this condition! + exclusions: | + exclusions: + "versionCompat": + - "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyCallShouldBeDone" + - "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyOneWayCallShouldBeDone" + jepsen: if: github.event.repository.fork == false && github.event.pull_request.head.repo.full_name == 'restatedev/restate' && github.ref == 'refs/heads/main' runs-on: warp-ubuntu-latest-arm64-4x diff --git a/crates/invoker-api/src/invocation_reader.rs b/crates/invoker-api/src/invocation_reader.rs index 932ca1f97d..278889c5ba 100644 --- a/crates/invoker-api/src/invocation_reader.rs +++ b/crates/invoker-api/src/invocation_reader.rs @@ -32,6 +32,8 @@ pub struct JournalMetadata { /// and the max time difference between two replicas applying the journal append command. pub last_modification_date: MillisSinceEpoch, pub random_seed: u64, + /// If true, the entries are stored in journal table v2 + pub using_journal_table_v2: bool, } impl JournalMetadata { @@ -42,6 +44,7 @@ impl JournalMetadata { invocation_epoch: InvocationEpoch, last_modification_date: MillisSinceEpoch, random_seed: u64, + using_journal_table_v2: bool, ) -> Self { Self { pinned_deployment, @@ -50,6 +53,7 @@ impl JournalMetadata { last_modification_date, invocation_epoch, random_seed, + using_journal_table_v2, } } } diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index c33b020a46..2c00ea9719 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -73,6 +73,7 @@ pub mod test_util { 0, MillisSinceEpoch::UNIX_EPOCH, 0, + true, ), futures::stream::empty(), ))) diff --git a/crates/invoker-impl/src/error.rs b/crates/invoker-impl/src/error.rs index f47a33118e..c4e23cb066 100644 --- a/crates/invoker-impl/src/error.rs +++ b/crates/invoker-impl/src/error.rs @@ -119,6 +119,11 @@ pub(crate) enum InvokerError { actual: InvocationEpoch, expected: InvocationEpoch, }, + #[error( + "error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption." + )] + #[code(unknown)] + UnexpectedEntryCount { actual: u32, expected: u32 }, #[error(transparent)] #[code(restate_errors::RT0010)] @@ -172,6 +177,12 @@ pub(crate) enum InvokerError { #[error("service is temporary unavailable '{0}'")] #[code(restate_errors::RT0010)] ServiceUnavailable(http::StatusCode), + + #[error( + "service {0} is exposed by the deprecated deployment {1}, please upgrade the SDK used by the service." + )] + #[code(restate_errors::RT0020)] + DeploymentDeprecated(String, DeploymentId), } impl InvokerError { diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index b251afedcf..913c0b18fe 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -395,6 +395,16 @@ where EagerState::>::default().map(itertools::Either::Right) }; + if chosen_service_protocol_version < ServiceProtocolVersion::V4 + && journal_metadata.using_journal_table_v2 + { + // We don't support migrating from journal v2 to journal v1! + shortcircuit!(Err(InvokerError::DeploymentDeprecated( + self.invocation_target.service_name().to_string(), + deployment.id + ))); + } + // No need to read from Rocksdb anymore drop(txn); diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index e0b8f3f586..f420369bc7 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -184,8 +184,13 @@ where // Execute the replay crate::shortcircuit!( - self.replay_loop(&mut http_stream_tx, &mut decoder_stream, journal_stream) - .await + self.replay_loop( + &mut http_stream_tx, + &mut decoder_stream, + journal_stream, + journal_metadata.length + ) + .await ); // If we have the invoker_rx and the protocol type is bidi stream, @@ -305,6 +310,7 @@ where http_stream_tx: &mut InvokerRequestStreamSender, http_stream_rx: &mut S, journal_stream: JournalStream, + expected_entries_count: u32, ) -> TerminalLoopState<()> where JournalStream: Stream + Unpin, @@ -312,6 +318,7 @@ where { let mut journal_stream = journal_stream.fuse(); let mut got_headers = false; + let mut sent_entries = 0; loop { tokio::select! { @@ -334,10 +341,11 @@ where opt_je = journal_stream.next() => { match opt_je { Some(JournalEntry::JournalV2(entry)) => { + sent_entries += 1; crate::shortcircuit!(self.write_entry(http_stream_tx, entry.inner).await); - } Some(JournalEntry::JournalV1(old_entry)) => { + sent_entries += 1; if let journal::Entry::Input(input_entry) = crate::shortcircuit!(old_entry.deserialize_entry::()) { crate::shortcircuit!(self.write_entry( http_stream_tx, @@ -352,6 +360,14 @@ where } }, None => { + // Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way! + if sent_entries < expected_entries_count { + return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount { + actual: sent_entries, + expected: expected_entries_count, + }) + } + // No need to wait for the headers to continue trace!("Finished to replay the journal"); return TerminalLoopState::Continue(()) diff --git a/crates/worker/src/partition/invoker_storage_reader.rs b/crates/worker/src/partition/invoker_storage_reader.rs index c3611bbcc3..36de76f662 100644 --- a/crates/worker/src/partition/invoker_storage_reader.rs +++ b/crates/worker/src/partition/invoker_storage_reader.rs @@ -19,7 +19,6 @@ use restate_storage_api::state_table::ReadStateTable; use restate_storage_api::{IsolationLevel, journal_table as journal_table_v1, journal_table_v2}; use restate_types::identifiers::InvocationId; use restate_types::identifiers::ServiceId; -use restate_types::service_protocol::ServiceProtocolVersion; use std::vec::IntoIter; #[derive(Debug, thiserror::Error)] @@ -81,40 +80,40 @@ where .unwrap_or_else(|| invocation_id.to_random_seed()); if let InvocationStatus::Invoked(invoked_status) = invocation_status { - let (journal_metadata, journal_stream) = if invoked_status - .pinned_deployment - .as_ref() - .is_some_and(|p| p.service_protocol_version >= ServiceProtocolVersion::V4) - { - // If pinned service protocol version exists and >= V4, we need to read from Journal Table V2! - let entries = journal_table_v2::ReadJournalTable::get_journal( - &mut self.txn, - *invocation_id, - invoked_status.journal_metadata.length, - )? - .map(|entry| { - entry - .map_err(InvokerStorageReaderError::Storage) - .map(|(_, entry)| { - restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry) - }) - }) - // TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275 - // collecting the stream because we cannot keep the transaction open - .try_collect::>() - .await?; - - let journal_metadata = JournalMetadata::new( - invoked_status.journal_metadata.length, - invoked_status.journal_metadata.span_context, - invoked_status.pinned_deployment, - invoked_status.current_invocation_epoch, - invoked_status.timestamps.modification_time(), - random_seed, - ); + // Try to read first from journal table v2 + let entries = journal_table_v2::ReadJournalTable::get_journal( + &mut self.txn, + *invocation_id, + invoked_status.journal_metadata.length, + )? + .map(|entry| { + entry + .map_err(InvokerStorageReaderError::Storage) + .map(|(_, entry)| { + restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry) + }) + }) + // TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275 + // collecting the stream because we cannot keep the transaction open + .try_collect::>() + .await?; - (journal_metadata, entries) + let (journal_metadata, journal_stream) = if !entries.is_empty() { + // We got the journal, good to go + ( + JournalMetadata::new( + invoked_status.journal_metadata.length, + invoked_status.journal_metadata.span_context, + invoked_status.pinned_deployment, + invoked_status.current_invocation_epoch, + invoked_status.timestamps.modification_time(), + random_seed, + true, + ), + entries, + ) } else { + // We didn't read a thing from journal table v2 -> we need to read journal v1 ( JournalMetadata::new( // Use entries len here, because we might be filtering out events @@ -124,6 +123,7 @@ where invoked_status.current_invocation_epoch, invoked_status.timestamps.modification_time(), random_seed, + false, ), journal_table_v1::ReadJournalTable::get_journal( &mut self.txn, diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 1d90e27304..9107ba2d04 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -16,6 +16,7 @@ pub mod shuffle; mod state_machine; pub mod types; +use std::env; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; @@ -66,7 +67,7 @@ use crate::metric_definitions::{ }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::LeadershipState; -use crate::partition::state_machine::{ActionCollector, StateMachine}; +use crate::partition::state_machine::{ActionCollector, Feature, StateMachine}; /// Target leader state of the partition processor. #[derive(Clone, Copy, Debug, Default, PartialEq)] @@ -193,13 +194,19 @@ where }); } + let mut features = EnumSet::new(); + // TODO(till) enable this using partition processor version barrier + if env::var("RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT").is_ok() { + features.insert(Feature::UseJournalTableV2AsDefault); + } + let state_machine = StateMachine::new( inbox_seq_number, outbox_seq_number, outbox_head_seq_number, partition_store.partition_key_range().clone(), min_restate_version, - EnumSet::empty(), + features, schema, ); diff --git a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs index b13c8dbede..c693cb25c2 100644 --- a/crates/worker/src/partition/rpc/restart_as_new_invocation.rs +++ b/crates/worker/src/partition/rpc/restart_as_new_invocation.rs @@ -101,10 +101,28 @@ where // If the invocation is using the old protocol version or no version is set at all. // We have a workaround here that manually creates the ServiceInvocation data structure reading the journal table v1 - let use_old_journal_workaround = completed_invocation - .pinned_deployment - .as_ref() - .is_none_or(|pd| pd.service_protocol_version < ServiceProtocolVersion::V4); + let use_old_journal_workaround = match &completed_invocation.pinned_deployment { + // If the pinned deployment is set, we know for sure which journal version will be used here. + Some(pd) => pd.service_protocol_version < ServiceProtocolVersion::V4, + None => { + // If the pinned deployment is not set, we try to read input entry v2. If absent, then we go with the workaround. + match journal_table_v2::ReadJournalTable::get_journal_entry( + self.storage, + invocation_id, + 0, + ) + .await + { + Ok(opt_entry) => opt_entry.is_none(), + Err(storage_error) => { + replier.send_result(Err(PartitionProcessorRpcError::Internal( + storage_error.to_string(), + ))); + return Ok(()); + } + } + } + }; // New invocation id let new_invocation_id = InvocationId::from_parts( @@ -140,7 +158,6 @@ where // Now retrieve the input command let find_result = async { - // Use `?` to propagate storage errors, mapping them to our error type. let Some(journal_table_v1::JournalEntry::Entry(entry)) = journal_table_v1::ReadJournalTable::get_journal_entry( self.storage, @@ -237,11 +254,14 @@ where // For Restart from prefix, the PP will actually execute the operation, // but here we perform few checks anyway. - let pinned_deployment = completed_invocation.pinned_deployment.unwrap(); + let pinned_service_protocol_version = completed_invocation + .pinned_deployment + .as_ref() + .map(|pd| pd.service_protocol_version); // Because of the changes to ctx.rand, you can restart from prefix different from 0 only if invocation >= protocol 6 - if pinned_deployment.service_protocol_version < ServiceProtocolVersion::V6 - && copy_prefix_up_to_index_included > 0 + if copy_prefix_up_to_index_included > 0 + && pinned_service_protocol_version.is_none_or(|sp| sp < ServiceProtocolVersion::V6) { bail!(replier, Unsupported); } @@ -250,7 +270,7 @@ where let deployment_id = match patch_deployment_id { PatchDeploymentId::KeepPinned => { // Just keep current deployment, all good - pinned_deployment.deployment_id + None } PatchDeploymentId::PinToLatest | PatchDeploymentId::PinTo { .. } => { // Retrieve the deployment @@ -269,21 +289,21 @@ where }; // Check the protocol constraints are respected. - if !deployment - .supported_protocol_versions - .contains(&(pinned_deployment.service_protocol_version as i32)) + if let Some(pinned_service_protocol) = pinned_service_protocol_version + && !deployment + .supported_protocol_versions + .contains(&(pinned_service_protocol as i32)) { replier.send( RestartAsNewInvocationRpcResponse::IncompatibleDeploymentId { - pinned_protocol_version: pinned_deployment.service_protocol_version - as i32, + pinned_protocol_version: pinned_service_protocol as i32, deployment_id: deployment.id, supported_protocol_versions: deployment.supported_protocol_versions, }, ); return Ok(()); } - deployment.id + Some(deployment.id) } }; @@ -350,7 +370,7 @@ where invocation_id, new_invocation_id, copy_prefix_up_to_index_included, - patch_deployment_id: Some(deployment_id), + patch_deployment_id: deployment_id, response_sink: Some(InvocationMutationResponseSink::Ingress( IngressInvocationResponseSink { request_id }, )), @@ -865,7 +885,6 @@ mod tests { ); let mut proposer = MockActuator::new(); - let pinned_id = completed.pinned_deployment.unwrap().deployment_id; proposer .expect_handle_rpc_proposal_command::() .return_once_st(move |_, cmd, _, _| { @@ -874,7 +893,68 @@ mod tests { pat!(Command::RestartAsNewInvocation(pat!( RestartAsNewInvocationRequest { copy_prefix_up_to_index_included: eq(0), - patch_deployment_id: some(eq(pinned_id)) + patch_deployment_id: none() + } + ))) + ); + ready(()).boxed() + }); + proposer + .expect_self_propose_and_respond_asynchronously::() + .never(); + + let (tx, rx) = Reciprocal::mock(); + RpcHandler::handle( + RpcContext::new( + &mut proposer, + &MockDeploymentMetadataRegistry::default(), + &mut storage, + ), + Request { + request_id: Default::default(), + invocation_id, + copy_prefix_up_to_index_included: 0, + patch_deployment_id: PatchDeploymentId::KeepPinned, + }, + Replier::new(tx), + ) + .await + .unwrap(); + + rx.assert_not_received(); + } + + #[test(restate_core::test)] + async fn copy_prefix_zero_without_pinned_uses_restart_as_new_command() { + let invocation_id = InvocationId::mock_random(); + let target = InvocationTarget::mock_virtual_object(); + let completed = CompletedInvocation { + invocation_target: target, + journal_metadata: JournalMetadata { + length: 1, + ..JournalMetadata::empty() + }, + pinned_deployment: None, + ..CompletedInvocation::mock_neo() + }; + + let mut storage = MockStorage::new_with_journal_v2( + invocation_id, + InvocationStatus::Completed(completed.clone()), + vec![input_command()], + true, + ); + + let mut proposer = MockActuator::new(); + proposer + .expect_handle_rpc_proposal_command::() + .return_once_st(move |_, cmd, _, _| { + assert_that!( + cmd, + pat!(Command::RestartAsNewInvocation(pat!( + RestartAsNewInvocationRequest { + copy_prefix_up_to_index_included: eq(0), + patch_deployment_id: none() } ))) ); @@ -1135,7 +1215,6 @@ mod tests { ); let mut proposer = MockActuator::new(); - let pinned_id = completed.pinned_deployment.unwrap().deployment_id; proposer .expect_handle_rpc_proposal_command::() .return_once_st(move |_, cmd, _, _| { @@ -1144,7 +1223,7 @@ mod tests { pat!(Command::RestartAsNewInvocation(pat!( RestartAsNewInvocationRequest { copy_prefix_up_to_index_included: eq(1), - patch_deployment_id: some(eq(pinned_id)) + patch_deployment_id: none() } ))) ); diff --git a/crates/worker/src/partition/state_machine/entries/mod.rs b/crates/worker/src/partition/state_machine/entries/mod.rs index 3271ff1561..b0ce93ba3d 100644 --- a/crates/worker/src/partition/state_machine/entries/mod.rs +++ b/crates/worker/src/partition/state_machine/entries/mod.rs @@ -405,9 +405,11 @@ impl ApplyJournalCommandEffect<'_, CMD> { #[cfg(test)] mod tests { + use crate::partition::state_machine::Feature; use crate::partition::state_machine::tests::fixtures::invoker_entry_effect; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use bytes::Bytes; + use enumset::EnumSet; use googletest::prelude::*; use restate_storage_api::invocation_status_table::ReadInvocationStatusTable; use restate_types::identifiers::{InvocationId, ServiceId}; @@ -416,10 +418,16 @@ mod tests { }; use restate_types::journal_v2::{CallCommand, CallRequest}; use restate_wal_protocol::Command; + use rstest::rstest; + #[rstest] #[restate_core::test] - async fn update_journal_and_commands_length() { - let mut test_env = TestEnv::create().await; + async fn update_journal_and_commands_length( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, + ) { + let mut test_env = TestEnv::create_with_features(features).await; let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await; diff --git a/crates/worker/src/partition/state_machine/entries/notification.rs b/crates/worker/src/partition/state_machine/entries/notification.rs index 629314f19d..c6ae38dcf4 100644 --- a/crates/worker/src/partition/state_machine/entries/notification.rs +++ b/crates/worker/src/partition/state_machine/entries/notification.rs @@ -201,9 +201,11 @@ where mod tests { use super::*; + use crate::partition::state_machine::Feature; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use bytes::Bytes; use bytestring::ByteString; + use enumset::EnumSet; use googletest::prelude::*; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; use restate_storage_api::invocation_status_table::{ @@ -262,9 +264,14 @@ mod tests { test_env.shutdown().await; } + #[rstest] #[restate_core::test] - async fn notify_signal_received_before_pinned_deployment() { - let mut test_env = TestEnv::create().await; + async fn notify_signal_received_before_pinned_deployment( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, + ) { + let mut test_env = TestEnv::create_with_features(features).await; let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; // Send signal notification before pinned deployment diff --git a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs index 8b0c74f997..487a35dba0 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs @@ -117,16 +117,17 @@ where mod tests { use super::*; - use crate::partition::state_machine::Action; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; + use crate::partition::state_machine::{Action, Feature}; use crate::partition::types::InvokerEffectKind; use assert2::assert; - use googletest::pat; - use googletest::prelude::{assert_that, contains, eq, ge, not, some}; + use enumset::EnumSet; + use googletest::prelude::*; use restate_invoker_api::Effect; use restate_storage_api::invocation_status_table::{ InvocationStatus, ReadInvocationStatusTable, }; + use restate_storage_api::journal_table_v2; use restate_storage_api::outbox_table::ReadOutboxTable; use restate_types::deployment::PinnedDeployment; use restate_types::errors::CANCELED_INVOCATION_ERROR; @@ -136,10 +137,11 @@ mod tests { InvocationTarget, InvocationTermination, JournalCompletionTarget, NotifySignalRequest, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, }; - use restate_types::journal_v2::CANCEL_SIGNAL; + use restate_types::journal_v2::{CANCEL_SIGNAL, CommandType, Entry, EntryMetadata, EntryType}; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::Command; + use rstest::rstest; #[restate_core::test] async fn cancel_invoked_invocation() { @@ -191,7 +193,8 @@ mod tests { } #[restate_core::test] - async fn cancel_invoked_invocation_without_pinned_deployment() { + async fn cancel_invoked_invocation_without_pinned_deployment_without_journal_table_v2_default() + { let mut test_env = TestEnv::create().await; let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; @@ -233,8 +236,44 @@ mod tests { } #[restate_core::test] - async fn cancel_scheduled_invocation_through_notify_signal() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; + async fn cancel_invoked_invocation_without_pinned_deployment_with_journal_table_v2_default() { + let mut test_env = TestEnv::create_with_features(Feature::UseJournalTableV2AsDefault).await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + + // Send signal notification before pinning the deployment + let actions = test_env + .apply(Command::NotifySignal(NotifySignalRequest { + invocation_id, + signal: CANCEL_SIGNAL.try_into().unwrap(), + })) + .await; + assert_that!( + actions, + contains(matchers::actions::forward_notification( + invocation_id, + CANCEL_SIGNAL.clone() + )) + ); + + assert_that!( + test_env.read_journal_to_vec(invocation_id, 2).await, + elements_are![ + property!(Entry.ty(), eq(EntryType::Command(CommandType::Input))), + matchers::entry_eq(CANCEL_SIGNAL), + ] + ); + + test_env.shutdown().await; + } + + #[rstest] + #[restate_core::test] + async fn cancel_scheduled_invocation_through_notify_signal( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, + ) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_features(features).await; let invocation_id = InvocationId::mock_random(); let rpc_id = PartitionProcessorRpcRequestId::new(); @@ -277,13 +316,40 @@ mod tests { .await?; assert!(let InvocationStatus::Free = current_invocation_status); + // Both journal table v1 and v2 are empty + assert!( + journal_table::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + &invocation_id, + 0 + ) + .await + .unwrap() + .is_none() + ); + assert!( + journal_table_v2::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + invocation_id, + 0 + ) + .await + .unwrap() + .is_none() + ); + test_env.shutdown().await; Ok(()) } + #[rstest] #[restate_core::test] - async fn cancel_inboxed_invocation_through_notify_signal() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; + async fn cancel_inboxed_invocation_through_notify_signal( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, + ) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_features(features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -335,6 +401,28 @@ mod tests { // assert that invocation status was removed assert!(let InvocationStatus::Free = current_invocation_status); + // Both journal table v1 and v2 are empty + assert!( + journal_table::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + &inboxed_id, + 0 + ) + .await + .unwrap() + .is_none() + ); + assert!( + journal_table_v2::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + inboxed_id, + 0 + ) + .await + .unwrap() + .is_none() + ); + assert_that!( actions, contains( diff --git a/crates/worker/src/partition/state_machine/lifecycle/purge.rs b/crates/worker/src/partition/state_machine/lifecycle/purge.rs index 674f16dc03..7493e93ab9 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/purge.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/purge.rs @@ -24,7 +24,6 @@ use restate_types::invocation::client::PurgeInvocationResponse; use restate_types::invocation::{ InvocationMutationResponseSink, InvocationTargetType, WorkflowHandlerType, }; -use restate_types::service_protocol::ServiceProtocolVersion; use tracing::trace; pub struct OnPurgeCommand { @@ -57,10 +56,9 @@ where pinned_deployment, .. }) => { - let should_remove_journal_table_v2 = - pinned_deployment.as_ref().is_some_and(|pinned_deployment| { - pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4 - }); + let pinned_service_protocol_version = pinned_deployment + .as_ref() + .map(|pd| pd.service_protocol_version); ctx.do_free_invocation(invocation_id)?; @@ -92,7 +90,7 @@ where ctx.do_drop_journal( invocation_id, journal_metadata.length, - should_remove_journal_table_v2, + pinned_service_protocol_version, ) .await?; } diff --git a/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs b/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs index 8d1424897d..d7491558f5 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs @@ -18,7 +18,6 @@ use restate_storage_api::journal_table_v2::WriteJournalTable; use restate_types::identifiers::InvocationId; use restate_types::invocation::InvocationMutationResponseSink; use restate_types::invocation::client::PurgeInvocationResponse; -use restate_types::service_protocol::ServiceProtocolVersion; use tracing::trace; pub struct OnPurgeJournalCommand { @@ -42,19 +41,17 @@ where } = self; match ctx.get_invocation_status(&invocation_id).await? { InvocationStatus::Completed(mut completed) => { - let should_remove_journal_table_v2 = completed + let pinned_service_protocol_version = completed .pinned_deployment .as_ref() - .is_some_and(|pinned_deployment| { - pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4 - }); + .map(|pd| pd.service_protocol_version); // If journal is not empty, clean it up if completed.journal_metadata.length != 0 { ctx.do_drop_journal( invocation_id, completed.journal_metadata.length, - should_remove_journal_table_v2, + pinned_service_protocol_version, ) .await?; } @@ -113,6 +110,7 @@ mod tests { InvocationTarget, PurgeInvocationRequest, ServiceInvocation, ServiceInvocationResponseSink, }; use restate_types::journal_v2::{CommandType, OutputCommand, OutputResult}; + use restate_types::service_protocol::ServiceProtocolVersion; use restate_wal_protocol::Command; use std::time::Duration; diff --git a/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs b/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs index a808b8ee08..f7d85afbba 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs @@ -21,12 +21,12 @@ use restate_storage_api::invocation_status_table::{ PreFlightInvocationMetadata, ReadInvocationStatusTable, StatusTimestamps, WriteInvocationStatusTable, }; -use restate_storage_api::journal_table as journal_table_v1; -use restate_storage_api::journal_table_v2::{ReadJournalTable, WriteJournalTable}; +use restate_storage_api::journal_table_v2::ReadJournalTable; use restate_storage_api::service_status_table::{ ReadVirtualObjectStatusTable, WriteVirtualObjectStatusTable, }; use restate_storage_api::timer_table::WriteTimerTable; +use restate_storage_api::{journal_table as journal_table_v1, journal_table_v2}; use restate_types::identifiers::{DeploymentId, EntryIndex, InvocationId}; use restate_types::invocation::client::RestartAsNewInvocationResponse; use restate_types::invocation::{ @@ -68,8 +68,7 @@ impl<'ctx, 's: 'ctx, S> StateMachineApplyContext<'s, S> { impl<'ctx, 's: 'ctx, S> CommandHandler<&'ctx mut StateMachineApplyContext<'s, S>> for OnRestartAsNewInvocationCommand where - S: WriteJournalTable - + ReadJournalTable + S: ReadJournalTable + IdempotencyTable + ReadInvocationStatusTable + WriteInvocationStatusTable @@ -78,7 +77,8 @@ where + WriteVirtualObjectStatusTable + WriteTimerTable + WriteInboxTable - + journal_table_v1::WriteJournalTable, + + journal_table_v1::WriteJournalTable + + journal_table_v2::WriteJournalTable, { async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> { let OnRestartAsNewInvocationCommand { @@ -146,7 +146,7 @@ where new_journal_commands += 1; // Now copy to the new journal - WriteJournalTable::put_journal_entry( + journal_table_v2::WriteJournalTable::put_journal_entry( ctx.storage, new_invocation_id, new_journal_index, @@ -166,7 +166,7 @@ where } // Now copy to the new journal - WriteJournalTable::put_journal_entry( + journal_table_v2::WriteJournalTable::put_journal_entry( ctx.storage, new_invocation_id, new_journal_index, @@ -192,7 +192,7 @@ where && missing_completions.remove(&completion_id) { // Copy over this notification - WriteJournalTable::put_journal_entry( + journal_table_v2::WriteJournalTable::put_journal_entry( ctx.storage, new_invocation_id, new_journal_index, @@ -280,6 +280,7 @@ where mod tests { use super::*; + use crate::partition::state_machine::Feature; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use googletest::prelude::*; use restate_storage_api::invocation_status_table::{ @@ -291,8 +292,8 @@ mod tests { }; use restate_types::invocation::client::RestartAsNewInvocationResponse; use restate_types::invocation::{ - IngressInvocationResponseSink, InvocationTarget, NotifySignalRequest, - RestartAsNewInvocationRequest, ServiceInvocation, + IngressInvocationResponseSink, InvocationTarget, InvocationTermination, + NotifySignalRequest, RestartAsNewInvocationRequest, ServiceInvocation, TerminationFlavor, }; use restate_types::journal_v2::{ CommandType, CompletionType, NotificationType, OutputCommand, OutputResult, Signal, @@ -377,6 +378,79 @@ mod tests { test_env.shutdown().await; } + #[restate_core::test] + async fn restart_killed_invocation() { + // This works only when using journal table v2 as default! + // The corner case with journal table v1 is handled by the rpc handler instead. + let mut test_env = TestEnv::create_with_features(Feature::UseJournalTableV2AsDefault).await; + + // Start invocation, then kill it + let invocation_target = InvocationTarget::mock_virtual_object(); + let original_invocation_id = InvocationId::generate(&invocation_target, None); + let _ = test_env + .apply_multiple([ + Command::Invoke(Box::new(ServiceInvocation { + invocation_id: original_invocation_id, + invocation_target: invocation_target.clone(), + completion_retention_duration: Duration::from_secs(120), + journal_retention_duration: Duration::from_secs(120), + ..ServiceInvocation::mock() + })), + Command::TerminateInvocation(InvocationTermination { + invocation_id: original_invocation_id, + flavor: TerminationFlavor::Kill, + response_sink: None, + }), + ]) + .await; + + // Restart as new with copy_prefix_up_to_index_included = 0 + let new_id = InvocationId::mock_generate(&invocation_target); + let request_id = PartitionProcessorRpcRequestId::new(); + let actions = test_env + .apply(Command::RestartAsNewInvocation( + RestartAsNewInvocationRequest { + invocation_id: original_invocation_id, + new_invocation_id: new_id, + copy_prefix_up_to_index_included: 0, + patch_deployment_id: None, + response_sink: Some(InvocationMutationResponseSink::Ingress( + IngressInvocationResponseSink { request_id }, + )), + }, + )) + .await; + + // We should invoke the new invocation and send OK back + assert_that!( + actions, + all!( + contains(matchers::actions::invoke_for_id(new_id)), + contains(pat!(Action::ForwardRestartAsNewInvocationResponse { + request_id: eq(request_id), + response: eq(RestartAsNewInvocationResponse::Ok { + new_invocation_id: new_id + }) + })) + ) + ); + + assert_that!( + test_env + .storage + .get_invocation_status(&new_id) + .await + .unwrap(), + all!( + matchers::storage::is_variant(InvocationStatusDiscriminants::Invoked), + matchers::storage::has_journal_length(1), + matchers::storage::has_commands(1) + ) + ); + + test_env.shutdown().await; + } + #[restate_core::test] async fn restart_copy_input_only() { let mut test_env = TestEnv::create().await; diff --git a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs index 648ae6de20..886fe13a76 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/suspend.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/suspend.rs @@ -91,19 +91,28 @@ where #[cfg(test)] mod tests { - use crate::partition::state_machine::Action; use crate::partition::state_machine::tests::fixtures::{ invoker_entry_effect, invoker_suspended, }; + use crate::partition::state_machine::tests::matchers::storage::{ + has_commands, has_journal_length, in_flight_metadata, is_variant, + }; use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; - use googletest::prelude::{all, assert_that, contains, eq, pat}; - use googletest::{elements_are, property}; + use crate::partition::state_machine::{Action, Feature}; + use enumset::EnumSet; + use googletest::prelude::*; + use restate_storage_api::invocation_status_table::{ + InFlightInvocationMetadata, InvocationStatusDiscriminants, ReadInvocationStatusTable, + }; + use restate_types::invocation::NotifySignalRequest; use restate_types::journal_v2::{ - CommandType, Entry, EntryMetadata, EntryType, NotificationId, SleepCommand, SleepCompletion, + CommandType, Entry, EntryMetadata, EntryType, NotificationId, Signal, SignalId, + SignalResult, SleepCommand, SleepCompletion, }; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyValue; + use rstest::rstest; use std::time::{Duration, SystemTime}; #[restate_core::test] @@ -217,4 +226,79 @@ mod tests { test_env.shutdown().await; } + + #[rstest] + #[restate_core::test] + async fn suspend_waiting_on_signal( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, + ) { + let mut test_env = TestEnv::create_with_features(features).await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + // We don't pin the deployment here, but this should work nevertheless. + + let _ = test_env + .apply(invoker_suspended( + invocation_id, + [NotificationId::for_signal(SignalId::for_index(17))], + )) + .await; + + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await, + ok(all!( + is_variant(InvocationStatusDiscriminants::Suspended), + has_journal_length(1), + in_flight_metadata(pat!(InFlightInvocationMetadata { + pinned_deployment: none() + })), + )) + ); + + // Let's notify the signal + let signal = Signal { + id: SignalId::for_index(17), + result: SignalResult::Void, + }; + let actions = test_env + .apply(Command::NotifySignal(NotifySignalRequest { + invocation_id, + signal: signal.clone(), + })) + .await; + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(invocation_id)) + ); + + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await, + ok(all!( + is_variant(InvocationStatusDiscriminants::Invoked), + has_journal_length(2), + has_commands(1), + in_flight_metadata(pat!(InFlightInvocationMetadata { + pinned_deployment: none() + })), + )) + ); + + // Check journal + assert_that!( + test_env.read_journal_to_vec(invocation_id, 2).await, + elements_are![ + property!(Entry.ty(), eq(EntryType::Command(CommandType::Input))), + matchers::entry_eq(signal), + ] + ); + + test_env.shutdown().await; + } } diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 26db066ca9..2f82fdc24f 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -34,14 +34,14 @@ use tracing::{Instrument, Span, debug, error, trace, warn}; use restate_invoker_api::InvokeInputJournal; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; -use restate_storage_api::Result as StorageResult; use restate_storage_api::fsm_table::WriteFsmTable; use restate_storage_api::idempotency_table::{IdempotencyTable, ReadOnlyIdempotencyTable}; use restate_storage_api::inbox_table::{InboxEntry, WriteInboxTable}; use restate_storage_api::invocation_status_table::{ - CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, JournalRetentionPolicy, - PreFlightInvocationArgument, PreFlightInvocationJournal, PreFlightInvocationMetadata, - ReadInvocationStatusTable, WriteInvocationStatusTable, + CompletedInvocation, InFlightInvocationMetadata, InboxedInvocation, JournalMetadata, + JournalRetentionPolicy, PreFlightInvocationArgument, PreFlightInvocationInput, + PreFlightInvocationJournal, PreFlightInvocationMetadata, ReadInvocationStatusTable, + WriteInvocationStatusTable, }; use restate_storage_api::invocation_status_table::{InvocationStatus, ScheduledInvocation}; use restate_storage_api::journal_events::WriteJournalEventsTable; @@ -58,6 +58,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::state_table::{ReadStateTable, WriteStateTable}; use restate_storage_api::timer_table::TimerKey; use restate_storage_api::timer_table::{Timer, WriteTimerTable}; +use restate_storage_api::{Result as StorageResult, journal_table}; use restate_tracing_instrumentation as instrumentation; use restate_types::errors::{ ALREADY_COMPLETED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, GenericError, @@ -94,7 +95,7 @@ use restate_types::journal_v2; use restate_types::journal_v2::command::{OutputCommand, OutputResult}; use restate_types::journal_v2::raw::RawNotification; use restate_types::journal_v2::{ - CommandType, CompletionId, EntryMetadata, NotificationId, Signal, SignalResult, + CommandType, CompletionId, EntryMetadata, InputCommand, NotificationId, Signal, SignalResult, }; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; @@ -102,6 +103,7 @@ use restate_types::schema::Schema; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::state_mut::ExternalStateMutation; use restate_types::state_mut::StateMutationVersion; +use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; use restate_types::{RestateVersion, SemanticRestateVersion}; use restate_types::{Versioned, journal::*}; @@ -115,7 +117,9 @@ use crate::partition::state_machine::lifecycle::OnCancelCommand; use crate::partition::types::{InvokerEffect, InvokerEffectKind, OutboxMessageExt}; #[derive(Debug, Hash, enumset::EnumSetType, strum::Display)] -pub enum ExperimentalFeature {} +pub enum Feature { + UseJournalTableV2AsDefault, +} pub struct StateMachine { // initialized from persistent storage @@ -132,7 +136,7 @@ pub struct StateMachine { pub(crate) partition_key_range: RangeInclusive, /// Enabled experimental features. - pub(crate) experimental_features: EnumSet, + pub(crate) features: EnumSet, } impl Debug for StateMachine { @@ -217,7 +221,7 @@ impl StateMachine { outbox_head_seq_number: Option, partition_key_range: RangeInclusive, min_restate_version: SemanticRestateVersion, - experimental_features: EnumSet, + experimental_features: EnumSet, schema: Option, ) -> Self { Self { @@ -226,7 +230,7 @@ impl StateMachine { outbox_head_seq_number, partition_key_range, min_restate_version, - experimental_features, + features: experimental_features, schema, } } @@ -243,8 +247,7 @@ pub(crate) struct StateMachineApplyContext<'a, S> { min_restate_version: &'a mut SemanticRestateVersion, schema: &'a mut Option, partition_key_range: RangeInclusive, - #[allow(dead_code)] - experimental_features: &'a EnumSet, + features: &'a EnumSet, is_leader: bool, } @@ -282,7 +285,7 @@ impl StateMachine { min_restate_version: &mut self.min_restate_version, schema: &mut self.schema, partition_key_range: self.partition_key_range.clone(), - experimental_features: &self.experimental_features, + features: &self.features, is_leader, } .on_apply(command) @@ -650,7 +653,8 @@ impl StateMachineApplyContext<'_, S> { + WriteTimerTable + WriteInboxTable + WriteFsmTable - + WriteJournalTable, + + WriteJournalTable + + journal_table_v2::WriteJournalTable, { let invocation_id = service_invocation.invocation_id; debug_assert!( @@ -696,7 +700,7 @@ impl StateMachineApplyContext<'_, S> { async fn on_pre_flight_invocation( &mut self, invocation_id: InvocationId, - pre_flight_invocation_metadata: PreFlightInvocationMetadata, + mut pre_flight_invocation_metadata: PreFlightInvocationMetadata, submit_notification_sink: Option, ) -> Result<(), Error> where @@ -708,10 +712,56 @@ impl StateMachineApplyContext<'_, S> { + WriteTimerTable + WriteInboxTable + WriteFsmTable - + WriteJournalTable, + + WriteJournalTable + + journal_table_v2::WriteJournalTable, { // A pre-flight invocation has been already deduplicated + // 0. Prepare the journal table v2 + if self.features.contains(Feature::UseJournalTableV2AsDefault) + && let PreFlightInvocationArgument::Input(PreFlightInvocationInput { + argument, + headers, + span_context, + }) = pre_flight_invocation_metadata.input + { + // In this case, we do the following: + // * Write the input in the journal table v2 + // * Change pre_flight_invocation_metadata.input + + // Prepare the new entry + let new_entry: journal_v2::Entry = InputCommand { + headers, + payload: argument, + name: Default::default(), + } + .into(); + let new_raw_entry = new_entry.encode::(); + + // Now write the entry in the new table + journal_table_v2::WriteJournalTable::put_journal_entry( + self.storage, + invocation_id, + 0, + &StoredRawEntry::new( + StoredRawEntryHeader::new(self.record_created_at), + new_raw_entry, + ), + &[], + )?; + + // Input is now a journal directly + pre_flight_invocation_metadata.input = + PreFlightInvocationArgument::Journal(PreFlightInvocationJournal { + journal_metadata: JournalMetadata { + length: 1, + commands: 1, + span_context, + }, + pinned_deployment: None, + }); + } + // 1. Check if we need to schedule it let execution_time = pre_flight_invocation_metadata.execution_time; let Some(pre_flight_invocation_metadata) = self.handle_service_invocation_execution_time( @@ -1025,7 +1075,7 @@ impl StateMachineApplyContext<'_, S> { invocation_input: Option, ) -> Result<(), Error> where - S: WriteJournalTable + WriteInvocationStatusTable, + S: WriteJournalTable + WriteInvocationStatusTable + journal_table_v2::WriteJournalTable, { // Usage metering for "actions" should include the Input journal entry // type, but it gets filtered out before reaching the state machine. @@ -1062,48 +1112,95 @@ impl StateMachineApplyContext<'_, S> { invocation_input: InvocationInput, ) -> Result where - S: WriteJournalTable, + S: WriteJournalTable + journal_table_v2::WriteJournalTable, { debug_if_leader!(self.is_leader, "Init journal with input entry"); // In our current data model, ServiceInvocation has always an input, so initial length is 1 in_flight_invocation_metadata.journal_metadata.length = 1; - // We store the entry in the JournalTable V1. - // When pinning the deployment version we figure the concrete protocol version - // * If <= V3, we keep everything in JournalTable V1 - // * If >= V4, we migrate the JournalTable to V2 - let input_entry = JournalEntry::Entry(ProtobufRawEntryCodec::serialize_as_input_entry( - invocation_input.headers, - invocation_input.argument, - )); - self.storage - .put_journal_entry(&invocation_id, 0, &input_entry) + if self.features.contains(Feature::UseJournalTableV2AsDefault) { + // Prepare the new entry + let new_entry: journal_v2::Entry = InputCommand { + headers: invocation_input.headers, + payload: invocation_input.argument, + name: Default::default(), + } + .into(); + let stored_entry = StoredRawEntry::new( + StoredRawEntryHeader::new(self.record_created_at), + new_entry.encode::(), + ); + + // Now write the entry in the new table + journal_table_v2::WriteJournalTable::put_journal_entry( + self.storage, + invocation_id, + 0, + &stored_entry, + &[], + )?; + + Ok(InvokeInputJournal::CachedJournal( + restate_invoker_api::JournalMetadata::new( + in_flight_invocation_metadata.journal_metadata.length, + in_flight_invocation_metadata + .journal_metadata + .span_context + .clone(), + None, + in_flight_invocation_metadata.current_invocation_epoch, + // This is safe to do as only the leader will execute the invoker command + MillisSinceEpoch::now(), + in_flight_invocation_metadata + .random_seed + .unwrap_or_else(|| invocation_id.to_random_seed()), + true, + ), + vec![restate_invoker_api::invocation_reader::JournalEntry::JournalV2(stored_entry)], + )) + } else { + // We store the entry in the JournalTable V1. + // When pinning the deployment version we figure the concrete protocol version + // * If <= V3, we keep everything in JournalTable V1 + // * If >= V4, we migrate the JournalTable to V2 + let input_entry = JournalEntry::Entry(ProtobufRawEntryCodec::serialize_as_input_entry( + invocation_input.headers, + invocation_input.argument, + )); + journal_table::WriteJournalTable::put_journal_entry( + self.storage, + &invocation_id, + 0, + &input_entry, + ) .map_err(Error::Storage)?; - let_assert!(JournalEntry::Entry(input_entry) = input_entry); + let_assert!(JournalEntry::Entry(input_entry) = input_entry); - Ok(InvokeInputJournal::CachedJournal( - restate_invoker_api::JournalMetadata::new( - in_flight_invocation_metadata.journal_metadata.length, - in_flight_invocation_metadata - .journal_metadata - .span_context - .clone(), - None, - in_flight_invocation_metadata.current_invocation_epoch, - // This is safe to do as only the leader will execute the invoker command - MillisSinceEpoch::now(), - in_flight_invocation_metadata - .random_seed - .unwrap_or_else(|| invocation_id.to_random_seed()), - ), - vec![ - restate_invoker_api::invocation_reader::JournalEntry::JournalV1( - input_entry.erase_enrichment(), + Ok(InvokeInputJournal::CachedJournal( + restate_invoker_api::JournalMetadata::new( + in_flight_invocation_metadata.journal_metadata.length, + in_flight_invocation_metadata + .journal_metadata + .span_context + .clone(), + None, + in_flight_invocation_metadata.current_invocation_epoch, + // This is safe to do as only the leader will execute the invoker command + MillisSinceEpoch::now(), + in_flight_invocation_metadata + .random_seed + .unwrap_or_else(|| invocation_id.to_random_seed()), + false, ), - ], - )) + vec![ + restate_invoker_api::invocation_reader::JournalEntry::JournalV1( + input_entry.erase_enrichment(), + ), + ], + )) + } } fn invoke( @@ -1314,52 +1411,57 @@ impl StateMachineApplyContext<'_, S> { { let mut status = self.get_invocation_status(&invocation_id).await?; - match status.get_invocation_metadata().and_then(|meta| { + let pinned_service_protocol = status.get_invocation_metadata().and_then(|meta| { meta.pinned_deployment .as_ref() .map(|pd| pd.service_protocol_version) - }) { - Some(sp_version) if sp_version >= ServiceProtocolVersion::V4 => { - OnCancelCommand { - invocation_id, - invocation_status: status, - response_sink, - } - .apply(self) - .await?; - return Ok(()); + }); + + if pinned_service_protocol + .is_some_and(|sp_version| sp_version >= ServiceProtocolVersion::V4) + || journal_table_v2::ReadJournalTable::get_journal_entry(self.storage, invocation_id, 0) + .await? + .is_some() + { + // If we got protocol 4 already pinned, or we're using anyway the journal table v2, then process using the new cancellation command + OnCancelCommand { + invocation_id, + invocation_status: status, + response_sink, } - None if matches!( + .apply(self) + .await?; + return Ok(()); + } else if pinned_service_protocol.is_none() + && matches!( status, InvocationStatus::Invoked(_) | InvocationStatus::Suspended { .. } - ) => - { - // We need to apply a corner case fix here. - // We don't know yet what's the protocol version being used, but we know the status is either invoker or suspended. - // To sort this out, we write a field in invocation status to make sure that after pinning the deployment, we run the cancellation. - // See OnPinnedDeploymentCommand for more info. - trace!( - "Storing hotfix for cancellation when invocation doesn't have a pinned service protocol, but is invoked/suspended" - ); + ) + { + // We need to apply a corner case fix here. + // We don't know yet what's the protocol version being used, but we know the status is either invoker or suspended. + // To sort this out, we write a field in invocation status to make sure that after pinning the deployment, we run the cancellation. + // See OnPinnedDeploymentCommand for more info. + trace!( + "Storing hotfix for cancellation when invocation doesn't have a pinned service protocol, but is invoked/suspended" + ); - match &mut status { - InvocationStatus::Invoked(metadata) - | InvocationStatus::Suspended { metadata, .. } => { - metadata.hotfix_apply_cancellation_after_deployment_is_pinned = true; - } - _ => { - unreachable!("It's checked above") - } - }; + match &mut status { + InvocationStatus::Invoked(metadata) + | InvocationStatus::Suspended { metadata, .. } => { + metadata.hotfix_apply_cancellation_after_deployment_is_pinned = true; + } + _ => { + unreachable!("It's checked above") + } + }; - self.storage - .put_invocation_status(&invocation_id, &status)?; - self.reply_to_cancel(response_sink, CancelInvocationResponse::Appended); - return Ok(()); - } - _ => { - // Continue below - } + self.storage + .put_invocation_status(&invocation_id, &status)?; + self.reply_to_cancel(response_sink, CancelInvocationResponse::Appended); + return Ok(()); + } else { + // Continue below }; match status { @@ -1503,14 +1605,14 @@ impl StateMachineApplyContext<'_, S> { pinned_deployment, }) = &input { - let should_remove_journal_table_v2 = - pinned_deployment.as_ref().is_some_and(|pinned_deployment| { - pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4 - }); + let pinned_service_protocol_version = pinned_deployment + .as_ref() + .map(|pd| pd.service_protocol_version); + self.do_drop_journal( invocation_id, journal_metadata.length, - should_remove_journal_table_v2, + pinned_service_protocol_version, ) .await?; } @@ -1587,14 +1689,14 @@ impl StateMachineApplyContext<'_, S> { pinned_deployment, }) = &input { - let should_remove_journal_table_v2 = - pinned_deployment.as_ref().is_some_and(|pinned_deployment| { - pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4 - }); + let pinned_service_protocol_version = pinned_deployment + .as_ref() + .map(|pd| pd.service_protocol_version); + self.do_drop_journal( invocation_id, journal_metadata.length, - should_remove_journal_table_v2, + pinned_service_protocol_version, ) .await?; } @@ -1958,7 +2060,8 @@ impl StateMachineApplyContext<'_, S> { + WriteInvocationStatusTable + WriteInboxTable + WriteFsmTable - + WriteJournalTable, + + WriteJournalTable + + journal_table_v2::WriteJournalTable, { debug_if_leader!( self.is_leader, @@ -2249,12 +2352,10 @@ impl StateMachineApplyContext<'_, S> { let completion_retention = invocation_metadata.completion_retention_duration; let journal_retention = invocation_metadata.journal_retention_duration; - let should_remove_journal_table_v2 = invocation_metadata + let pinned_service_protocol_version = invocation_metadata .pinned_deployment .as_ref() - .is_some_and(|pinned_deployment| { - pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4 - }); + .map(|pd| pd.service_protocol_version); // If there are any response sinks, or we need to store back the completed status, // we need to find the latest output entry @@ -2336,7 +2437,7 @@ impl StateMachineApplyContext<'_, S> { self.do_drop_journal( invocation_id, journal_length, - should_remove_journal_table_v2, + pinned_service_protocol_version, ) .await?; } @@ -2400,7 +2501,8 @@ impl StateMachineApplyContext<'_, S> { + WriteVirtualObjectStatusTable + ReadStateTable + WriteStateTable - + WriteJournalTable, + + WriteJournalTable + + journal_table_v2::WriteJournalTable, { // Inbox exists only for virtual object exclusive handler cases if invocation_target.invocation_target_ty() @@ -4198,7 +4300,7 @@ impl StateMachineApplyContext<'_, S> { &mut self, invocation_id: InvocationId, journal_length: EntryIndex, - should_remove_journal_table_v2: bool, + pinned_protocol_version: Option, ) -> Result<(), Error> where S: WriteJournalTable + journal_table_v2::WriteJournalTable + WriteJournalEventsTable, @@ -4209,17 +4311,18 @@ impl StateMachineApplyContext<'_, S> { "Effect: Drop journal" ); - if should_remove_journal_table_v2 { + if pinned_protocol_version.is_none_or(|sp| sp < ServiceProtocolVersion::V4) { + WriteJournalTable::delete_journal(self.storage, &invocation_id, journal_length) + .map_err(Error::Storage)?; + }; + if pinned_protocol_version.is_none_or(|sp| sp >= ServiceProtocolVersion::V4) { journal_table_v2::WriteJournalTable::delete_journal( self.storage, invocation_id, journal_length, ) .map_err(Error::Storage)? - } else { - WriteJournalTable::delete_journal(self.storage, &invocation_id, journal_length) - .map_err(Error::Storage)?; - } + }; WriteJournalEventsTable::delete_journal_events(self.storage, invocation_id) .map_err(Error::Storage)?; Ok(()) diff --git a/crates/worker/src/partition/state_machine/tests/delayed_send.rs b/crates/worker/src/partition/state_machine/tests/delayed_send.rs index 4e8b78a029..ce269f2345 100644 --- a/crates/worker/src/partition/state_machine/tests/delayed_send.rs +++ b/crates/worker/src/partition/state_machine/tests/delayed_send.rs @@ -13,15 +13,83 @@ use super::*; use restate_storage_api::inbox_table::ReadInboxTable; use restate_types::invocation::SubmitNotificationSink; use restate_types::time::MillisSinceEpoch; +use rstest::rstest; use std::time::{Duration, SystemTime}; use test_log::test; -#[test(restate_core::test)] -async fn send_with_delay() { +#[rstest] +#[restate_core::test] +async fn send_with_delay( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, +) { + let mut test_env = TestEnv::create_with_features(features).await; + + let invocation_target = InvocationTarget::mock_service(); + let invocation_id = InvocationId::mock_random(); + + let request_id = PartitionProcessorRpcRequestId::default(); + + let wake_up_time = MillisSinceEpoch::from(SystemTime::now() + Duration::from_secs(60)); + let actions = test_env + .apply(Command::Invoke(Box::new(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + response_sink: None, + submit_notification_sink: Some(SubmitNotificationSink::Ingress { request_id }), + // Doesn't matter the execution time here, just needs to be filled + execution_time: Some(wake_up_time), + ..ServiceInvocation::mock() + }))) + .await; + assert_that!( + actions, + all!( + not(contains(matchers::actions::invoke_for_id(invocation_id))), + contains(pat!(Action::RegisterTimer { .. })), + contains(eq(Action::IngressSubmitNotification { + request_id, + execution_time: Some(wake_up_time), + is_new_invocation: true + })) + ) + ); + + // Now fire the timer + let actions = test_env + .apply(Command::Timer(TimerKeyValue::neo_invoke( + wake_up_time, + invocation_id, + ))) + .await; + + assert_that!( + actions, + all!( + contains(matchers::actions::invoke_for_id(invocation_id)), + not(contains(eq(Action::IngressSubmitNotification { + request_id, + execution_time: Some(wake_up_time), + is_new_invocation: true, + }))) + ) + ); + assert_that!( + test_env.storage.get_invocation_status(&invocation_id).await, + ok(pat!(InvocationStatus::Invoked { .. })) + ); + test_env.shutdown().await; +} + +/// This tests the case where an invocation was enqueued, and then the runtime was later restarted with journal table v2 default on +#[restate_core::test] +async fn send_with_delay_where_experimental_feature_journal_table_v2_is_enabled_later() { let mut test_env = TestEnv::create().await; let invocation_target = InvocationTarget::mock_service(); let invocation_id = InvocationId::mock_random(); + let argument = restate_test_util::rand::bytes(); let request_id = PartitionProcessorRpcRequestId::default(); @@ -31,6 +99,7 @@ async fn send_with_delay() { invocation_id, invocation_target: invocation_target.clone(), response_sink: None, + argument: argument.clone(), submit_notification_sink: Some(SubmitNotificationSink::Ingress { request_id }), // Doesn't matter the execution time here, just needs to be filled execution_time: Some(wake_up_time), @@ -50,6 +119,9 @@ async fn send_with_delay() { ) ); + // Now let's update the features + test_env.set_features(Feature::UseJournalTableV2AsDefault); + // Now fire the timer let actions = test_env .apply(Command::Timer(TimerKeyValue::neo_invoke( @@ -73,6 +145,15 @@ async fn send_with_delay() { test_env.storage.get_invocation_status(&invocation_id).await, ok(pat!(InvocationStatus::Invoked { .. })) ); + + assert_eq!( + test_env + .read_journal_entry::(invocation_id, 0) + .await + .payload, + argument + ); + test_env.shutdown().await; } diff --git a/crates/worker/src/partition/state_machine/tests/fixtures.rs b/crates/worker/src/partition/state_machine/tests/fixtures.rs index c66ef4f58a..39e24dd984 100644 --- a/crates/worker/src/partition/state_machine/tests/fixtures.rs +++ b/crates/worker/src/partition/state_machine/tests/fixtures.rs @@ -13,7 +13,7 @@ use crate::partition::state_machine::tests::TestEnv; use crate::partition::types::InvokerEffectKind; use bytes::Bytes; use googletest::prelude::*; -use restate_invoker_api::{Effect, InvokeInputJournal}; +use restate_invoker_api::Effect; use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec; use restate_storage_api::journal_table::JournalEntry; use restate_types::deployment::PinnedDeployment; @@ -167,7 +167,6 @@ pub async fn mock_start_invocation_with_invocation_target( contains(pat!(Action::Invoke { invocation_id: eq(invocation_id), invocation_target: eq(invocation_target), - invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _)) })) ); diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index 97312d2125..da31a85ee2 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -14,6 +14,7 @@ use assert2::assert; use assert2::let_assert; use googletest::any; use prost::Message; +use restate_storage_api::journal_table; use restate_storage_api::journal_table::WriteJournalTable; use restate_storage_api::timer_table::{ ReadTimerTable, Timer, TimerKey, TimerKeyKind, WriteTimerTable, @@ -27,9 +28,14 @@ use restate_types::service_protocol; use rstest::rstest; use test_log::test; +#[rstest] #[restate_core::test] -async fn kill_inboxed_invocation() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +async fn kill_inboxed_invocation( + #[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet< + Feature, + >, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_features(features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -85,6 +91,20 @@ async fn kill_inboxed_invocation() -> anyhow::Result<()> { // assert that invocation status was removed assert!(let InvocationStatus::Free = current_invocation_status); + // Both journal table v1 and v2 are empty + assert!( + journal_table::ReadJournalTable::get_journal_entry(&mut test_env.storage, &inboxed_id, 0) + .await + .unwrap() + .is_none() + ); + assert!( + journal_table_v2::ReadJournalTable::get_journal_entry(&mut test_env.storage, inboxed_id, 0) + .await + .unwrap() + .is_none() + ); + assert_that!( actions, all!( @@ -291,6 +311,27 @@ async fn kill_call_tree() -> anyhow::Result<()> { .await?, empty() ); + // Both journal table v1 and v2 are empty + assert!( + journal_table::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + &invocation_id, + 0 + ) + .await + .unwrap() + .is_none() + ); + assert!( + journal_table_v2::ReadJournalTable::get_journal_entry( + &mut test_env.storage, + invocation_id, + 0 + ) + .await + .unwrap() + .is_none() + ); test_env.shutdown().await; Ok(()) diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 31c3cb59e4..e58db1c7bb 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -86,19 +86,17 @@ impl TestEnv { } pub async fn create() -> Self { - Self::create_with_experimental_features(Default::default()).await + Self::create_with_features(EnumSet::default()).await } - pub async fn create_with_experimental_features( - experimental_features: EnumSet, - ) -> Self { + pub async fn create_with_features(experimental_features: impl Into>) -> Self { Self::create_with_state_machine(StateMachine::new( 0, /* inbox_seq_number */ 0, /* outbox_seq_number */ None, /* outbox_head_seq_number */ PartitionKey::MIN..=PartitionKey::MAX, SemanticRestateVersion::unknown().clone(), - experimental_features, + experimental_features.into(), None, )) .await @@ -227,7 +225,6 @@ impl TestEnv { .collect() } - #[allow(unused)] pub async fn read_journal_entry( &mut self, invocation_id: InvocationId, @@ -310,6 +307,10 @@ impl TestEnv { ok(none()) ); } + + pub fn set_features(&mut self, features: impl Into>) { + self.state_machine.features = features.into() + } } type TestResult = Result<(), anyhow::Error>;