Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion crates/common/src/protos/canned_histories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,8 @@ pub fn single_child_workflow_try_cancelled(child_wf_id: &str) -> TestHistoryBuil
/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// 5: EVENT_TYPE_MARKER_RECORDED (la result)
/// 7: EVENT_TYPE_MARKER_RECORDED (la result)
/// 8: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED
Comment on lines +1308 to +1309
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updating comment here to match actual history

pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand All @@ -1321,6 +1322,24 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder {
t
}

/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED
/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// 5: EVENT_TYPE_MARKER_RECORDED (LA 2 result)
/// 7: EVENT_TYPE_MARKER_RECORDED (LA 1 result)
/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED
pub fn parallel_las_job_order_hist() -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_local_activity_result_marker(2, "2", b"hi2".into());
t.add_local_activity_result_marker(1, "1", b"hi1".into());
t.add_workflow_task_scheduled_and_started();
t
}

/// Useful for one-of needs to write a crafted history to a file. Writes it as serialized proto
/// binary to the provided path.
pub fn write_hist_to_binfile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ fsm! {
// is replaying), and then immediately scheduled and transitions to either requesting that lang
// execute the activity, or waiting for the marker from history.
Executing --(Schedule, shared on_schedule) --> RequestSent;
Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent;
ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved;
Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead;

// Execution path =============================================================================
RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated;
Expand All @@ -66,32 +65,28 @@ fsm! {
--> MarkerCommandRecorded;

// Replay path ================================================================================
// LAs on the replay path always need to eventually see the marker
WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;
WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;
// If we are told to cancel while waiting for the marker, we still need to wait for the marker.
WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;

// Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling
// the LA and the marker being recorded, peekahead might not always resolve the LA *before*
// scheduling it. This transition accounts for that.
WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent;
WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;

// LAs on the replay path always need to eventually see the marker
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;

// It is entirely possible to have started the LA while replaying, only to find that we have
// reached a new WFT and there still was no marker. In such cases we need to execute the LA.
// This can easily happen if upon first execution, the worker does WFT heartbeating but then
// dies for some reason.
WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// If the activity is pre resolved we still expect to see marker recorded event at some point,
// even though we already resolved the activity.
WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData),
shared on_marker_recorded) --> MarkerCommandRecorded;
// Ignore cancellations when waiting for the marker after being pre-resolved
WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved;
WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType))
--> WaitingMarkerEventPreResolved;
WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// Ignore cancellation in final state
MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded;
Expand Down Expand Up @@ -151,22 +146,12 @@ impl From<CompleteLocalActivityData> for ResolveDat {
pub(super) fn new_local_activity(
mut attrs: ValidScheduleLA,
replaying_when_invoked: bool,
maybe_pre_resolved: Option<ResolveDat>,
wf_time: Option<SystemTime>,
internal_flags: InternalFlagsRef,
) -> Result<(LocalActivityMachine, Vec<MachineResponse>), WFMachinesError> {
let initial_state = if replaying_when_invoked {
if let Some(dat) = maybe_pre_resolved {
ReplayingPreResolved { dat }.into()
} else {
Replaying {}.into()
}
Replaying {}.into()
} else {
if maybe_pre_resolved.is_some() {
return Err(nondeterminism!(
"Local activity cannot be created as pre-resolved while not replaying"
));
}
Executing {}.into()
};

Expand Down Expand Up @@ -202,15 +187,12 @@ impl LocalActivityMachine {
/// command-event processing - instead simply applying the event to this machine and then
/// skipping over the rest. If this machine is in the `ResultNotified` state, that means
/// command handling should proceed as normal (ie: The command needs to be matched and removed).
/// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]`
/// states, which will return true.
///
/// Attempting the check in any other state likely means a bug in the SDK.
pub(super) fn marker_should_get_special_handling(&self) -> Result<bool, WFMachinesError> {
match self.state() {
LocalActivityMachineState::ResultNotified(_) => Ok(false),
LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true),
LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true),
LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true),
_ => Err(fatal!(
"Attempted to check for LA marker handling in invalid state {}",
self.state()
Expand All @@ -223,7 +205,7 @@ impl LocalActivityMachine {
pub(super) fn will_accept_resolve_marker(&self) -> bool {
matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
)
}

Expand All @@ -234,7 +216,7 @@ impl LocalActivityMachine {
// This only applies to the waiting-for-marker state. It can safely be ignored in the others
if !matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
) {
return Ok(vec![]);
}
Expand Down Expand Up @@ -368,12 +350,6 @@ pub(super) enum LocalActivityCommand {
RequestActivityExecution(ValidScheduleLA),
#[display("Resolved")]
Resolved(ResolveDat),
/// The fake marker is used to avoid special casing marker recorded event handling.
/// If we didn't have the fake marker, there would be no "outgoing command" to match
/// against the event. This way there is, but the command never will be issued to
/// server because it is understood to be meaningless.
#[display("FakeMarker")]
FakeMarker,
/// Indicate we want to cancel an LA that is currently executing, or look up if we have
/// processed a marker with resolution data since the machine was constructed.
#[display("Cancel")]
Expand Down Expand Up @@ -448,31 +424,10 @@ impl MarkerCommandRecorded {
#[derive(Default, Clone)]
pub(super) struct Replaying {}
impl Replaying {
pub(super) fn on_schedule(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
TransitionResult::ok(
[],
WaitingMarkerEvent {
already_resolved: false,
},
)
}
}

#[derive(Clone)]
pub(super) struct ReplayingPreResolved {
dat: ResolveDat,
}
impl ReplayingPreResolved {
pub(super) fn on_schedule(
self,
) -> LocalActivityMachineTransition<WaitingMarkerEventPreResolved> {
TransitionResult::ok(
[
LocalActivityCommand::FakeMarker,
LocalActivityCommand::Resolved(self.dat),
],
WaitingMarkerEventPreResolved {},
)
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {})
}
}

Expand Down Expand Up @@ -559,35 +514,16 @@ impl ResultNotified {
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEvent {
already_resolved: bool,
}
pub(super) struct WaitingResolveFromMarkerLookAhead {}

impl WaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(
shared,
&dat,
TransitionResult::commands(if self.already_resolved {
vec![]
} else {
vec![LocalActivityCommand::Resolved(dat.into())]
})
)
}
impl WaitingResolveFromMarkerLookAhead {
fn on_handle_result(
self,
dat: ResolveDat,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
TransitionResult::ok(
[LocalActivityCommand::Resolved(dat)],
WaitingMarkerEvent {
already_resolved: true,
},
ResolvedFromMarkerLookAheadWaitingMarkerEvent {},
)
}
pub(super) fn on_started_non_replay_wft(
Expand All @@ -601,7 +537,9 @@ impl WaitingMarkerEvent {
)])
}

fn on_cancel_requested(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
Expand All @@ -610,23 +548,41 @@ impl WaitingMarkerEvent {
fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEventPreResolved {}
impl WaitingMarkerEventPreResolved {
pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {}

impl ResolvedFromMarkerLookAheadWaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(shared, &dat, TransitionResult::default())
}

fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
}

fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

impl WFMachinesAdapter for LocalActivityMachine {
Expand Down Expand Up @@ -780,12 +736,6 @@ impl WFMachinesAdapter for LocalActivityMachine {
}
Ok(responses)
}
LocalActivityCommand::FakeMarker => {
// See docs for `FakeMarker` for more
Ok(vec![MachineResponse::IssueFakeLocalActivityMarker(
self.shared_state.attrs.seq,
)])
}
LocalActivityCommand::RequestCancel => {
Ok(vec![MachineResponse::RequestCancelLocalActivity(
self.shared_state.attrs.seq,
Expand Down
Loading
Loading