From 678a67fcf14d791827d1169df42def41dd1b8850 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 4 Dec 2025 11:51:14 -0500 Subject: [PATCH 1/9] add failing test --- crates/common/src/protos/canned_histories.rs | 21 +++++- .../include/temporal-sdk-core-c-bridge.h | 30 ++++---- .../workflow_tests/local_activities.rs | 74 +++++++++++++++++++ 3 files changed, 109 insertions(+), 16 deletions(-) diff --git a/crates/common/src/protos/canned_histories.rs b/crates/common/src/protos/canned_histories.rs index d507a7117..903512f43 100644 --- a/crates/common/src/protos/canned_histories.rs +++ b/crates/common/src/protos/canned_histories.rs @@ -1305,7 +1305,8 @@ pub fn single_child_workflow_try_cancelled(child_wf_id: &str) -> TestHistoryBuil /// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// 5: EVENT_TYPE_MARKER_RECORDED (la result) /// 7: EVENT_TYPE_MARKER_RECORDED (la result) -/// 8: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -1321,6 +1322,24 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_MARKER_RECORDED (LA 2 result) +/// 7: EVENT_TYPE_MARKER_RECORDED (LA 1 result) +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn parallel_las_job_order_hist() -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_local_activity_result_marker(2, "2", b"hi2".into()); + t.add_local_activity_result_marker(1, "1", b"hi1".into()); + t.add_workflow_task_scheduled_and_started(); + t +} + /// Useful for one-of needs to write a crafted history to a file. Writes it as serialized proto /// binary to the provided path. pub fn write_hist_to_binfile( diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index e55a8d0b7..114e55c24 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -5,13 +5,13 @@ #include #include -typedef enum TemporalCoreRpcService { - Workflow = 1, - Operator, - Cloud, - Test, - Health, -} TemporalCoreRpcService; +typedef enum TemporalCoreForwardedLogLevel { + Trace = 0, + Debug, + Info, + Warn, + Error, +} TemporalCoreForwardedLogLevel; typedef enum TemporalCoreMetricAttributeValueType { String = 1, @@ -29,14 +29,6 @@ typedef enum TemporalCoreMetricKind { GaugeFloat, } TemporalCoreMetricKind; -typedef enum TemporalCoreForwardedLogLevel { - Trace = 0, - Debug, - Info, - Warn, - Error, -} TemporalCoreForwardedLogLevel; - typedef enum TemporalCoreOpenTelemetryMetricTemporality { Cumulative = 1, Delta, @@ -47,6 +39,14 @@ typedef enum TemporalCoreOpenTelemetryProtocol { Http, } TemporalCoreOpenTelemetryProtocol; +typedef enum TemporalCoreRpcService { + Workflow = 1, + Operator, + Cloud, + Test, + Health, +} TemporalCoreRpcService; + typedef enum TemporalCoreSlotKindType { WorkflowSlotKindType, ActivitySlotKindType, diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index b3e99cb79..36b384060 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -2589,6 +2589,80 @@ async fn two_sequential_las( worker.run().await.unwrap(); } +async fn parallel_las_job_order_wf(ctx: WfContext) -> WorkflowResult<()> { + tokio::join!( + ctx.local_activity(LocalActivityOptions { + activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), + input: 100.as_json_payload().unwrap(), + ..Default::default() + }), + ctx.local_activity(LocalActivityOptions { + activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), + input: 1.as_json_payload().unwrap(), + ..Default::default() + }) + ); + Ok(().into()) +} + +#[rstest] +#[tokio::test] +async fn parallel_las_job_order(#[values(true, false)] replay: bool) { + let t = canned_histories::parallel_las_job_order_hist(); + let mut mock_cfg = if replay { + MockPollCfg::from_resps(t, [ResponseType::AllHistory]) + } else { + MockPollCfg::from_hist_builder(t) + }; + + let mut aai = ActivationAssertionsInterceptor::default(); + // Verify ResolveActivity jobs are received in completion order (seq 2 first, then seq 1) + // This catches the bug where they might be sent in request order instead + aai.skip_one().then(move |a| { + assert_matches!( + a.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra1)) + }, WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra2)) + }] => {assert_eq!(ra1.seq, 2); assert_eq!(ra2.seq, 1)} + ); + }); + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts.then(move |wft| { + let commands = &wft.commands; + if !replay { + assert_eq!(commands.len(), 3); + assert_eq!(commands[0].command_type(), CommandType::RecordMarker); + assert_eq!(commands[1].command_type(), CommandType::RecordMarker); + assert_matches!( + commands[2].command_type(), + CommandType::CompleteWorkflowExecution + ); + } else { + assert_eq!(commands.len(), 1); + assert_matches!( + commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + } + }); + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.set_worker_interceptor(aai); + worker.register_wf(DEFAULT_WORKFLOW_TYPE, parallel_las_job_order_wf); + worker.register_activity( + DEFAULT_ACTIVITY_TYPE, + move |_ctx: ActContext, sleep_ms: u64| async move { + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + Ok("Resolved") + }, + ); + worker.run().await.unwrap(); +} + async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), From 46768ad3280aea9c05f17a96f4ea0a89fd2c188b Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 09:42:24 -0500 Subject: [PATCH 2/9] initial fix --- .../machines/local_activity_state_machine.rs | 169 ++++++++++-------- .../workflow/machines/workflow_machines.rs | 26 ++- .../machines/workflow_machines/local_acts.rs | 20 ++- .../src/worker/workflow/managed_run.rs | 8 + 4 files changed, 140 insertions(+), 83 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 6a12e11dd..48f639d1c 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -48,8 +48,7 @@ fsm! { // is replaying), and then immediately scheduled and transitions to either requesting that lang // execute the activity, or waiting for the marker from history. Executing --(Schedule, shared on_schedule) --> RequestSent; - Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent; - ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved; + Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead; // Execution path ============================================================================= RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated; @@ -66,32 +65,29 @@ fsm! { --> MarkerCommandRecorded; // Replay path ================================================================================ - // LAs on the replay path always need to eventually see the marker - WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) - --> MarkerCommandRecorded; + //WaitingResolveFromMarkerLookAhead --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) --> MarkerCommandRecorded; + WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; // If we are told to cancel while waiting for the marker, we still need to wait for the marker. - WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + // Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling // the LA and the marker being recorded, peekahead might not always resolve the LA *before* // scheduling it. This transition accounts for that. - WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent; - WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), - on_no_wait_cancel) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + + // LAs on the replay path always need to eventually see the marker + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) + --> MarkerCommandRecorded; // It is entirely possible to have started the LA while replaying, only to find that we have // reached a new WFT and there still was no marker. In such cases we need to execute the LA. // This can easily happen if upon first execution, the worker does WFT heartbeating but then // dies for some reason. - WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; - - // If the activity is pre resolved we still expect to see marker recorded event at some point, - // even though we already resolved the activity. - WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData), - shared on_marker_recorded) --> MarkerCommandRecorded; - // Ignore cancellations when waiting for the marker after being pre-resolved - WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved; - WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType)) - --> WaitingMarkerEventPreResolved; + WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; // Ignore cancellation in final state MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded; @@ -101,6 +97,7 @@ fsm! { // LAs reporting status after they've handled their result can simply be ignored. We could // optimize this away higher up but that feels very overkill. MarkerCommandCreated --(HandleResult(ResolveDat)) --> MarkerCommandCreated; + MarkerCommandCreated --(HandleKnownResult(ResolveDat)) --> MarkerCommandCreated; ResultNotified --(HandleResult(ResolveDat)) --> ResultNotified; MarkerCommandRecorded --(HandleResult(ResolveDat)) --> MarkerCommandRecorded; } @@ -151,22 +148,12 @@ impl From for ResolveDat { pub(super) fn new_local_activity( mut attrs: ValidScheduleLA, replaying_when_invoked: bool, - maybe_pre_resolved: Option, wf_time: Option, internal_flags: InternalFlagsRef, ) -> Result<(LocalActivityMachine, Vec), WFMachinesError> { let initial_state = if replaying_when_invoked { - if let Some(dat) = maybe_pre_resolved { - ReplayingPreResolved { dat }.into() - } else { - Replaying {}.into() - } + Replaying {}.into() } else { - if maybe_pre_resolved.is_some() { - return Err(nondeterminism!( - "Local activity cannot be created as pre-resolved while not replaying" - )); - } Executing {}.into() }; @@ -202,15 +189,13 @@ impl LocalActivityMachine { /// command-event processing - instead simply applying the event to this machine and then /// skipping over the rest. If this machine is in the `ResultNotified` state, that means /// command handling should proceed as normal (ie: The command needs to be matched and removed). - /// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]` - /// states, which will return true. /// /// Attempting the check in any other state likely means a bug in the SDK. pub(super) fn marker_should_get_special_handling(&self) -> Result { match self.state() { LocalActivityMachineState::ResultNotified(_) => Ok(false), - LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true), - LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true), + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true), _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", self.state() @@ -223,7 +208,7 @@ impl LocalActivityMachine { pub(super) fn will_accept_resolve_marker(&self) -> bool { matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) } @@ -231,10 +216,21 @@ impl LocalActivityMachine { pub(super) fn encountered_non_replay_wft( &mut self, ) -> Result, WFMachinesError> { + if matches!( + self.state(), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) + ) { + panic!( + "Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}", + self.shared_state.attrs.seq, + self.state(), + ); + } + // This only applies to the waiting-for-marker state. It can safely be ignored in the others if !matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) { return Ok(vec![]); } @@ -448,31 +444,10 @@ impl MarkerCommandRecorded { #[derive(Default, Clone)] pub(super) struct Replaying {} impl Replaying { - pub(super) fn on_schedule(self) -> LocalActivityMachineTransition { - TransitionResult::ok( - [], - WaitingMarkerEvent { - already_resolved: false, - }, - ) - } -} - -#[derive(Clone)] -pub(super) struct ReplayingPreResolved { - dat: ResolveDat, -} -impl ReplayingPreResolved { pub(super) fn on_schedule( self, - ) -> LocalActivityMachineTransition { - TransitionResult::ok( - [ - LocalActivityCommand::FakeMarker, - LocalActivityCommand::Resolved(self.dat), - ], - WaitingMarkerEventPreResolved {}, - ) + ) -> LocalActivityMachineTransition { + TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {}) } } @@ -559,11 +534,11 @@ impl ResultNotified { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEvent { - already_resolved: bool, -} +pub(super) struct WaitingResolveFromMarkerLookAhead {} -impl WaitingMarkerEvent { +impl WaitingResolveFromMarkerLookAhead { + // FIXME(JWH): I dont think this transition exists any longer. + /* pub(super) fn on_marker_recorded( self, shared: &mut SharedState, @@ -572,22 +547,21 @@ impl WaitingMarkerEvent { verify_marker_dat!( shared, &dat, - TransitionResult::commands(if self.already_resolved { - vec![] - } else { - vec![LocalActivityCommand::Resolved(dat.into())] - }) + TransitionResult::commands(vec![LocalActivityCommand::Resolved(dat.into())]) ) } + */ + fn on_handle_result( self, dat: ResolveDat, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { TransitionResult::ok( - [LocalActivityCommand::Resolved(dat)], - WaitingMarkerEvent { - already_resolved: true, - }, + [ + // LocalActivityCommand::FakeMarker, + LocalActivityCommand::Resolved(dat), + ], + ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, ) } pub(super) fn on_started_non_replay_wft( @@ -601,7 +575,9 @@ impl WaitingMarkerEvent { )]) } - fn on_cancel_requested(self) -> LocalActivityMachineTransition { + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { // We still "request a cancel" even though we know the local activity should not be running // because the data might be in the pre-resolved list. TransitionResult::ok([LocalActivityCommand::RequestCancel], self) @@ -610,7 +586,7 @@ impl WaitingMarkerEvent { fn on_no_wait_cancel( self, _: ActivityCancellationType, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { // Markers are always recorded when cancelling, so this is the same as a normal cancel on // the replay path self.on_cancel_requested() @@ -618,14 +594,51 @@ impl WaitingMarkerEvent { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEventPreResolved {} -impl WaitingMarkerEventPreResolved { +pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {} + +impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { pub(super) fn on_marker_recorded( self, shared: &mut SharedState, dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { - verify_marker_dat!(shared, &dat, TransitionResult::default()) + verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) + } + // fn on_handle_result( + // self, + // dat: ResolveDat, + // ) -> LocalActivityMachineTransition { + // TransitionResult::ok( + // [LocalActivityCommand::Resolved(dat)], + // ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, + // ) + // } + // pub(super) fn on_started_non_replay_wft( + // self, + // dat: &mut SharedState, + // ) -> LocalActivityMachineTransition { + // // We aren't really "replaying" anymore for our purposes, and want to record the marker. + // dat.replaying_when_invoked = false; + // TransitionResult::commands([LocalActivityCommand::RequestActivityExecution( + // dat.attrs.clone(), + // )]) + // } + + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { + // We still "request a cancel" even though we know the local activity should not be running + // because the data might be in the pre-resolved list. + TransitionResult::ok([LocalActivityCommand::RequestCancel], self) + } + + fn on_no_wait_cancel( + self, + _: ActivityCancellationType, + ) -> LocalActivityMachineTransition { + // Markers are always recorded when cancelling, so this is the same as a normal cancel on + // the replay path + self.on_cancel_requested() } } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index dd7ba43f2..ad287d3b5 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -534,6 +534,7 @@ impl WorkflowMachines { pub(crate) fn iterate_machines(&mut self) -> Result<()> { let results = self.drive_me.fetch_workflow_iteration_output(); self.handle_driven_results(results)?; + self.apply_local_action_peeked_resolutions()?; self.prepare_commands()?; if self.workflow_is_finished() && let Some(rt) = self.total_runtime() @@ -543,6 +544,25 @@ impl WorkflowMachines { Ok(()) } + fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { + while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { + let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { + // If we haven't encountered the LA schedule yet, stop processing the + // preresolutions. + break; + }; + // Look to make this "safe" + let dat = self.local_activity_data.take_preresolution(seq).unwrap(); + if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { + let resps = lam.try_resolve_with_dat(dat)?; + self.process_machine_responses(mk, resps)?; + } else { + panic!("Found non-LAM for LA command"); + } + } + Ok(()) + } + /// Returns true if machines are ready to apply the next WFT sequence, false if events will need /// to be fetched in order to create a complete update with the entire next WFT sequence. pub(crate) fn ready_to_apply_next_wft(&self) -> bool { @@ -807,6 +827,7 @@ impl WorkflowMachines { DelayedAction::WakeLa(mk, la_dat) => { let mach = self.machine_mut(mk); if let Machines::LocalActivityMachine(ref mut lam) = *mach { + // self.local_activity_data.insert_peeked_marker(*la_dat); if lam.will_accept_resolve_marker() { let resps = lam.try_resolve_with_dat((*la_dat).into())?; self.process_machine_responses(mk, resps)?; @@ -1360,13 +1381,16 @@ impl WorkflowMachines { let (la, mach_resp) = new_local_activity( attrs, self.replaying, - self.local_activity_data.take_preresolution(seq), self.current_wf_time, self.observed_internal_flags.clone(), )?; let machkey = self.all_machines.insert(la.into()); self.id_to_machine .insert(CommandID::LocalActivity(seq), machkey); + // This now only does additional things on execute + // Previously on resolved replay it would be processing + // [LocalActivityCommand::FakeMarker, LocalActivityCommand::Resolved(self.dat)] + // replay will alway exit in `WaitingResolveFromMarkerLookAhead` state self.process_machine_responses(machkey, mach_resp)?; } WFCommandVariant::RequestCancelActivity(attrs) => { diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs index bb24a1a92..e5d1b7a9e 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs @@ -4,7 +4,7 @@ use crate::{ worker::{ExecutingLAId, LocalActRequest, NewLocalAct}, }; use std::{ - collections::{HashMap, HashSet}, + collections::{HashSet, VecDeque}, time::SystemTime, }; use temporalio_common::protos::temporal::api::common::v1::WorkflowExecution; @@ -19,7 +19,7 @@ pub(super) struct LocalActivityData { executing: HashSet, /// Maps local activity sequence numbers to their resolutions as found when looking ahead at /// next WFT - preresolutions: HashMap, + preresolutions: VecDeque<(u32, ResolveDat)>, /// Set true if the workflow is terminating am_terminating: bool, } @@ -78,11 +78,23 @@ impl LocalActivityData { } pub(super) fn insert_peeked_marker(&mut self, dat: CompleteLocalActivityData) { - self.preresolutions.insert(dat.marker_dat.seq, dat.into()); + self.preresolutions + .push_back((dat.marker_dat.seq, dat.into())); } pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { - self.preresolutions.remove(&seq) + let idx = self + .preresolutions + .iter() + .enumerate() + .find_map(|(ix, (s, _))| (*s == seq).then_some(ix))?; + let (_, dat) = self.preresolutions.remove(idx).unwrap(); + Some(dat) + } + + pub(super) fn peek_preresolution_seq(&self) -> Option { + let (seq, _) = self.preresolutions.front()?; + Some(*seq) } pub(super) fn remove_from_queue(&mut self, seq: u32) -> Option { diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index f77975ab9..ac725533f 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -727,6 +727,8 @@ impl ManagedRun { if let Some(update) = update_from_new_page { self.wfm.feed_history_from_new_page(update)?; } + // here the las will be in `WaitingResolveFromMarkerLookAhead` on replay and no + // presolutions. // Don't bother applying the next task if we're evicting at the end of this activation // or are otherwise broken. if !completion.activation_was_eviction && !self.am_broken { @@ -1445,7 +1447,13 @@ impl WorkflowManager { return Ok(()); } loop { + // On first WFT, this will look forward and peek the results from next WFT + // On the let consumed_events = self.machines.apply_next_wft_from_history()?; + // Now we have presolutions + // previously, this las would be in the + // + // Maybe we apply presolutions here? if consumed_events == 0 || !self.machines.replaying || self.machines.has_pending_jobs() { From 787cb57a982b69294466192c4b08ce5d7c80aea6 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 14:31:25 -0500 Subject: [PATCH 3/9] avoid saving unneeded peeked resolutions --- .../workflow/machines/local_activity_state_machine.rs | 1 - .../src/worker/workflow/machines/workflow_machines.rs | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 48f639d1c..35b050798 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -97,7 +97,6 @@ fsm! { // LAs reporting status after they've handled their result can simply be ignored. We could // optimize this away higher up but that feels very overkill. MarkerCommandCreated --(HandleResult(ResolveDat)) --> MarkerCommandCreated; - MarkerCommandCreated --(HandleKnownResult(ResolveDat)) --> MarkerCommandCreated; ResultNotified --(HandleResult(ResolveDat)) --> ResultNotified; MarkerCommandRecorded --(HandleResult(ResolveDat)) --> MarkerCommandRecorded; } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index ad287d3b5..7339beab0 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -827,12 +827,16 @@ impl WorkflowMachines { DelayedAction::WakeLa(mk, la_dat) => { let mach = self.machine_mut(mk); if let Machines::LocalActivityMachine(ref mut lam) = *mach { - // self.local_activity_data.insert_peeked_marker(*la_dat); if lam.will_accept_resolve_marker() { let resps = lam.try_resolve_with_dat((*la_dat).into())?; self.process_machine_responses(mk, resps)?; } else { - self.local_activity_data.insert_peeked_marker(*la_dat); + // Since the LA machine exists, we have encountered the LA WF command. + // But since it will not accept a resolve marker, we have no use saving + // this marker. + // Previously, peeked markers were stored in a map, but since we are + // now storing in an ordered collection we no longer can store this + // unnecessary data as `apply_local_action_peeked_resolutions` will attempt to apply it. } } } From 8c31fbc3178a5471989836d8c2fc20a5ae337f9d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 15:30:20 -0500 Subject: [PATCH 4/9] remove dead code, clean up comments --- .../machines/local_activity_state_machine.rs | 51 +------------------ .../workflow/machines/workflow_machines.rs | 25 ++++----- .../machines/workflow_machines/local_acts.rs | 9 ++-- 3 files changed, 14 insertions(+), 71 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 35b050798..1806c3b03 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -363,12 +363,6 @@ pub(super) enum LocalActivityCommand { RequestActivityExecution(ValidScheduleLA), #[display("Resolved")] Resolved(ResolveDat), - /// The fake marker is used to avoid special casing marker recorded event handling. - /// If we didn't have the fake marker, there would be no "outgoing command" to match - /// against the event. This way there is, but the command never will be issued to - /// server because it is understood to be meaningless. - #[display("FakeMarker")] - FakeMarker, /// Indicate we want to cancel an LA that is currently executing, or look up if we have /// processed a marker with resolution data since the machine was constructed. #[display("Cancel")] @@ -536,30 +530,12 @@ impl ResultNotified { pub(super) struct WaitingResolveFromMarkerLookAhead {} impl WaitingResolveFromMarkerLookAhead { - // FIXME(JWH): I dont think this transition exists any longer. - /* - pub(super) fn on_marker_recorded( - self, - shared: &mut SharedState, - dat: CompleteLocalActivityData, - ) -> LocalActivityMachineTransition { - verify_marker_dat!( - shared, - &dat, - TransitionResult::commands(vec![LocalActivityCommand::Resolved(dat.into())]) - ) - } - */ - fn on_handle_result( self, dat: ResolveDat, ) -> LocalActivityMachineTransition { TransitionResult::ok( - [ - // LocalActivityCommand::FakeMarker, - LocalActivityCommand::Resolved(dat), - ], + [LocalActivityCommand::Resolved(dat)], ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, ) } @@ -603,25 +579,6 @@ impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { ) -> LocalActivityMachineTransition { verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) } - // fn on_handle_result( - // self, - // dat: ResolveDat, - // ) -> LocalActivityMachineTransition { - // TransitionResult::ok( - // [LocalActivityCommand::Resolved(dat)], - // ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, - // ) - // } - // pub(super) fn on_started_non_replay_wft( - // self, - // dat: &mut SharedState, - // ) -> LocalActivityMachineTransition { - // // We aren't really "replaying" anymore for our purposes, and want to record the marker. - // dat.replaying_when_invoked = false; - // TransitionResult::commands([LocalActivityCommand::RequestActivityExecution( - // dat.attrs.clone(), - // )]) - // } fn on_cancel_requested( self, @@ -792,12 +749,6 @@ impl WFMachinesAdapter for LocalActivityMachine { } Ok(responses) } - LocalActivityCommand::FakeMarker => { - // See docs for `FakeMarker` for more - Ok(vec![MachineResponse::IssueFakeLocalActivityMarker( - self.shared_state.attrs.seq, - )]) - } LocalActivityCommand::RequestCancel => { Ok(vec![MachineResponse::RequestCancelLocalActivity( self.shared_state.attrs.seq, diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 7339beab0..45ac06046 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -177,8 +177,6 @@ struct CommandAndMachine { #[derive(Debug, derive_more::Display)] enum MachineAssociatedCommand { Real(Box), - #[display("FakeLocalActivityMarker({_0})")] - FakeLocalActivityMarker(u32), } #[derive(Debug, Clone, Copy)] @@ -205,8 +203,6 @@ pub(super) enum MachineResponse { /// collisions. #[display("NewCoreOriginatedCommand({_0:?})")] NewCoreOriginatedCommand(ProtoCmdAttrs), - #[display("IssueFakeLocalActivityMarker({_0})")] - IssueFakeLocalActivityMarker(u32), #[display("TriggerWFTaskStarted")] TriggerWFTaskStarted { task_started_event_id: i64, @@ -421,7 +417,6 @@ impl WorkflowMachines { if !self.machine(c.machine).is_final_state() { match &c.command { MachineAssociatedCommand::Real(cmd) => Some((**cmd).clone()), - MachineAssociatedCommand::FakeLocalActivityMarker(_) => None, } } else { None @@ -547,17 +542,22 @@ impl WorkflowMachines { fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { - // If we haven't encountered the LA schedule yet, stop processing the + // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the // preresolutions. break; }; - // Look to make this "safe" - let dat = self.local_activity_data.take_preresolution(seq).unwrap(); + let dat = self + .local_activity_data + .take_preresolution(seq) + .expect("This seq was just given by `peek_preresolution_seq`"); if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { let resps = lam.try_resolve_with_dat(dat)?; self.process_machine_responses(mk, resps)?; } else { - panic!("Found non-LAM for LA command"); + return Err(fatal!( + "Peeked local activity marker but the associated machine was of the \ + wrong type! {dat:?}" + )); } } Ok(()) @@ -1151,7 +1151,6 @@ impl WorkflowMachines { .handle_command(cmd.command_type())?; self.process_machine_responses(c.machine, machine_responses)?; } - MachineAssociatedCommand::FakeLocalActivityMarker(_) => {} } self.commands.push_back(c); } @@ -1238,12 +1237,6 @@ impl WorkflowMachines { )); } }, - MachineResponse::IssueFakeLocalActivityMarker(seq) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: MachineAssociatedCommand::FakeLocalActivityMarker(seq), - machine: smk, - }); - } MachineResponse::QueueLocalActivity(act) => { self.local_activity_data.enqueue(act); } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs index e5d1b7a9e..b1954226d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs @@ -83,12 +83,11 @@ impl LocalActivityData { } pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { - let idx = self + let idx = self.preresolutions.iter().position(|(s, _)| *s == seq)?; + let (_, dat) = self .preresolutions - .iter() - .enumerate() - .find_map(|(ix, (s, _))| (*s == seq).then_some(ix))?; - let (_, dat) = self.preresolutions.remove(idx).unwrap(); + .remove(idx) + .expect("This index was just found to contain seq"); Some(dat) } From 0c13ef5cfcaccd76b02bdd80d921d02dacbfb2ab Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 15:37:28 -0500 Subject: [PATCH 5/9] remove MachineAssociatedCommand --- .../workflow/machines/workflow_machines.rs | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 45ac06046..f023cef25 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -170,15 +170,10 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::Display)] #[display("Cmd&Machine({command})")] struct CommandAndMachine { - command: MachineAssociatedCommand, + command: ProtoCommand, machine: MachineKey, } -#[derive(Debug, derive_more::Display)] -enum MachineAssociatedCommand { - Real(Box), -} - #[derive(Debug, Clone, Copy)] struct ChangeInfo { created_command: bool, @@ -415,9 +410,7 @@ impl WorkflowMachines { .write_all_known(); self.commands.iter().filter_map(|c| { if !self.machine(c.machine).is_final_state() { - match &c.command { - MachineAssociatedCommand::Real(cmd) => Some((**cmd).clone()), - } + Some(c.command.clone()) } else { None } @@ -1144,14 +1137,10 @@ impl WorkflowMachines { .machine(c.machine) .was_cancelled_before_sent_to_server() { - match &c.command { - MachineAssociatedCommand::Real(cmd) => { - let machine_responses = self - .machine_mut(c.machine) - .handle_command(cmd.command_type())?; - self.process_machine_responses(c.machine, machine_responses)?; - } - } + let machine_responses = self + .machine_mut(c.machine) + .handle_command(c.command.command_type())?; + self.process_machine_responses(c.machine, machine_responses)?; self.commands.push_back(c); } } @@ -1195,7 +1184,7 @@ impl WorkflowMachines { } MachineResponse::IssueNewCommand(c) => { self.current_wf_task_commands.push_back(CommandAndMachine { - command: MachineAssociatedCommand::Real(Box::new(c)), + command: c, machine: smk, }) } @@ -1631,7 +1620,7 @@ impl WorkflowMachines { user_metadata: metadata, }; CommandAndMachine { - command: MachineAssociatedCommand::Real(Box::new(cmd)), + command: cmd, machine: k, } } From 3f3f73ac4b7e3f8f610ce11599096b471cf5a0a6 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 16:48:46 -0500 Subject: [PATCH 6/9] clean up comments --- .../machines/local_activity_state_machine.rs | 13 +------------ .../worker/workflow/machines/workflow_machines.rs | 4 ---- crates/sdk-core/src/worker/workflow/managed_run.rs | 8 -------- 3 files changed, 1 insertion(+), 24 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 1806c3b03..118b25828 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -215,17 +215,6 @@ impl LocalActivityMachine { pub(super) fn encountered_non_replay_wft( &mut self, ) -> Result, WFMachinesError> { - if matches!( - self.state(), - LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) - ) { - panic!( - "Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}", - self.shared_state.attrs.seq, - self.state(), - ); - } - // This only applies to the waiting-for-marker state. It can safely be ignored in the others if !matches!( self.state(), @@ -577,7 +566,7 @@ impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { shared: &mut SharedState, dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { - verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) + verify_marker_dat!(shared, &dat, TransitionResult::default()) } fn on_cancel_requested( diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index f023cef25..f860d4e76 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1373,10 +1373,6 @@ impl WorkflowMachines { let machkey = self.all_machines.insert(la.into()); self.id_to_machine .insert(CommandID::LocalActivity(seq), machkey); - // This now only does additional things on execute - // Previously on resolved replay it would be processing - // [LocalActivityCommand::FakeMarker, LocalActivityCommand::Resolved(self.dat)] - // replay will alway exit in `WaitingResolveFromMarkerLookAhead` state self.process_machine_responses(machkey, mach_resp)?; } WFCommandVariant::RequestCancelActivity(attrs) => { diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index ac725533f..f77975ab9 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -727,8 +727,6 @@ impl ManagedRun { if let Some(update) = update_from_new_page { self.wfm.feed_history_from_new_page(update)?; } - // here the las will be in `WaitingResolveFromMarkerLookAhead` on replay and no - // presolutions. // Don't bother applying the next task if we're evicting at the end of this activation // or are otherwise broken. if !completion.activation_was_eviction && !self.am_broken { @@ -1447,13 +1445,7 @@ impl WorkflowManager { return Ok(()); } loop { - // On first WFT, this will look forward and peek the results from next WFT - // On the let consumed_events = self.machines.apply_next_wft_from_history()?; - // Now we have presolutions - // previously, this las would be in the - // - // Maybe we apply presolutions here? if consumed_events == 0 || !self.machines.replaying || self.machines.has_pending_jobs() { From 3c0f475df1f7d69dc5a08c98f414c12cb86c088a Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 10 Dec 2025 08:50:54 -0500 Subject: [PATCH 7/9] more comments, fix formatting --- .../include/temporal-sdk-core-c-bridge.h | 30 +++++------ .../machines/local_activity_state_machine.rs | 1 - .../workflow/machines/workflow_machines.rs | 50 ++++++++++--------- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index 114e55c24..e55a8d0b7 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -5,13 +5,13 @@ #include #include -typedef enum TemporalCoreForwardedLogLevel { - Trace = 0, - Debug, - Info, - Warn, - Error, -} TemporalCoreForwardedLogLevel; +typedef enum TemporalCoreRpcService { + Workflow = 1, + Operator, + Cloud, + Test, + Health, +} TemporalCoreRpcService; typedef enum TemporalCoreMetricAttributeValueType { String = 1, @@ -29,6 +29,14 @@ typedef enum TemporalCoreMetricKind { GaugeFloat, } TemporalCoreMetricKind; +typedef enum TemporalCoreForwardedLogLevel { + Trace = 0, + Debug, + Info, + Warn, + Error, +} TemporalCoreForwardedLogLevel; + typedef enum TemporalCoreOpenTelemetryMetricTemporality { Cumulative = 1, Delta, @@ -39,14 +47,6 @@ typedef enum TemporalCoreOpenTelemetryProtocol { Http, } TemporalCoreOpenTelemetryProtocol; -typedef enum TemporalCoreRpcService { - Workflow = 1, - Operator, - Cloud, - Test, - Health, -} TemporalCoreRpcService; - typedef enum TemporalCoreSlotKindType { WorkflowSlotKindType, ActivitySlotKindType, diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 118b25828..beab9bd0e 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -65,7 +65,6 @@ fsm! { --> MarkerCommandRecorded; // Replay path ================================================================================ - //WaitingResolveFromMarkerLookAhead --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) --> MarkerCommandRecorded; WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; // If we are told to cancel while waiting for the marker, we still need to wait for the marker. WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead; diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index f860d4e76..65075fc31 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -532,30 +532,6 @@ impl WorkflowMachines { Ok(()) } - fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { - while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { - let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { - // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the - // preresolutions. - break; - }; - let dat = self - .local_activity_data - .take_preresolution(seq) - .expect("This seq was just given by `peek_preresolution_seq`"); - if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { - let resps = lam.try_resolve_with_dat(dat)?; - self.process_machine_responses(mk, resps)?; - } else { - return Err(fatal!( - "Peeked local activity marker but the associated machine was of the \ - wrong type! {dat:?}" - )); - } - } - Ok(()) - } - /// Returns true if machines are ready to apply the next WFT sequence, false if events will need /// to be fetched in order to create a complete update with the entire next WFT sequence. pub(crate) fn ready_to_apply_next_wft(&self) -> bool { @@ -1663,6 +1639,32 @@ impl WorkflowMachines { } } } + + // Applies local action preresolutions peeked from history until encountering a result for an + // LA that has yet to be scheduled. + fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { + while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { + let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { + // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the + // preresolutions. + break; + }; + let dat = self + .local_activity_data + .take_preresolution(seq) + .expect("This seq was just given by `peek_preresolution_seq`"); + if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { + let resps = lam.try_resolve_with_dat(dat)?; + self.process_machine_responses(mk, resps)?; + } else { + return Err(fatal!( + "Peeked local activity marker but the associated machine was of the \ + wrong type! {dat:?}" + )); + } + } + Ok(()) + } } /// Contains everything workflow machine internals need to bubble up when we're getting ready to From de25a45dbf9890070a60b9b5d117836e90d7687f Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 10 Dec 2025 10:12:34 -0500 Subject: [PATCH 8/9] remove special case check --- .../src/worker/workflow/machines/local_activity_state_machine.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index beab9bd0e..a87fdc3a1 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -192,7 +192,6 @@ impl LocalActivityMachine { pub(super) fn marker_should_get_special_handling(&self) -> Result { match self.state() { LocalActivityMachineState::ResultNotified(_) => Ok(false), - LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true), LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true), _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", From 50d8a6f3ee59a6745bc6c86de0ab6c528e09a7cc Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 17 Dec 2025 10:40:36 -0500 Subject: [PATCH 9/9] make error nde --- .../sdk-core/src/worker/workflow/machines/workflow_machines.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 65075fc31..5865b73c4 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1657,7 +1657,7 @@ impl WorkflowMachines { let resps = lam.try_resolve_with_dat(dat)?; self.process_machine_responses(mk, resps)?; } else { - return Err(fatal!( + return Err(nondeterminism!( "Peeked local activity marker but the associated machine was of the \ wrong type! {dat:?}" ));