diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index 1165930b..d0a1c00c 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -345,7 +345,7 @@ def apply(job) when :initialize_workflow # Ignore when :fire_timer - pending_timers.delete(job.fire_timer.seq)&.resume + pending_timers[job.fire_timer.seq]&.resume when :update_random_seed @random = illegal_call_tracing_disabled { Random.new(job.update_random_seed.randomness_seed) } when :query_workflow @@ -356,23 +356,23 @@ def apply(job) when :signal_workflow apply_signal(job.signal_workflow) when :resolve_activity - pending_activities.delete(job.resolve_activity.seq)&.resume(job.resolve_activity.result) + pending_activities[job.resolve_activity.seq]&.resume(job.resolve_activity.result) when :notify_has_patch @patches_notified << job.notify_has_patch.patch_id when :resolve_child_workflow_execution_start - pending_child_workflow_starts.delete(job.resolve_child_workflow_execution_start.seq)&.resume( + pending_child_workflow_starts[job.resolve_child_workflow_execution_start.seq]&.resume( job.resolve_child_workflow_execution_start ) when :resolve_child_workflow_execution - pending_child_workflows.delete(job.resolve_child_workflow_execution.seq)&._resolve( + pending_child_workflows[job.resolve_child_workflow_execution.seq]&._resolve( job.resolve_child_workflow_execution.result ) when :resolve_signal_external_workflow - pending_external_signals.delete(job.resolve_signal_external_workflow.seq)&.resume( + pending_external_signals[job.resolve_signal_external_workflow.seq]&.resume( job.resolve_signal_external_workflow ) when :resolve_request_cancel_external_workflow - pending_external_cancels.delete(job.resolve_request_cancel_external_workflow.seq)&.resume( + pending_external_cancels[job.resolve_request_cancel_external_workflow.seq]&.resume( job.resolve_request_cancel_external_workflow ) when :do_update diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index 1b565ada..cbae4b6e 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -43,7 +43,12 @@ def cancel_external_workflow(input) @instance.pending_external_cancels[seq] = Fiber.current # Wait - resolution = Fiber.yield + resolution = begin + Fiber.yield + ensure + # Remove pending + @instance.pending_external_cancels.delete(seq) + end # Raise if resolution has failure return unless resolution.failure @@ -169,10 +174,13 @@ def execute_activity_once(local:, cancellation:, last_local_backoff:, result_hin end # Wait - resolution = Fiber.yield - - # Remove cancellation callback - cancellation.remove_cancel_callback(cancel_callback_key) + resolution = begin + Fiber.yield + ensure + # Remove pending and cancel callback + @instance.pending_activities.delete(seq) + cancellation.remove_cancel_callback(cancel_callback_key) + end case resolution.status when :completed @@ -254,10 +262,13 @@ def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation end # Wait - resolution = Fiber.yield - - # Remove cancellation callback - cancellation.remove_cancel_callback(cancel_callback_key) + resolution = begin + Fiber.yield + ensure + # Remove pending and cancel callback + @instance.pending_external_signals.delete(seq) + cancellation.remove_cancel_callback(cancel_callback_key) + end # Raise if resolution has failure return unless resolution.failure @@ -317,7 +328,12 @@ def sleep(input) end # Wait - Fiber.yield + begin + Fiber.yield + ensure + # Remove pending + @instance.pending_timers.delete(seq) + end # Remove cancellation callback (only needed on success) input.cancellation.remove_cancel_callback(cancel_callback_key) @@ -374,7 +390,12 @@ def start_child_workflow(input) end # Wait for start - resolution = Fiber.yield + resolution = begin + Fiber.yield + ensure + # Remove pending + @instance.pending_child_workflow_starts.delete(seq) + end case resolution.status when :succeeded diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb index 21410a72..75176d56 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb @@ -78,7 +78,12 @@ def wait_condition(cancellation:, &block) end # This blocks until a resume is called on this fiber - result = Fiber.yield + result = begin + Fiber.yield + ensure + # Remove pending + @wait_conditions.delete(seq) + end # Remove cancellation callback (only needed on success) cancellation&.remove_cancel_callback(cancel_callback_key) if cancel_callback_key diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 4fe96fb1..0a52fdc8 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -2685,18 +2685,43 @@ def test_last_failure assert_equal 'Intentional failure', previous_failure end end -end -# TODO(cretz): To test -# * Common -# * Eager workflow start -# * Unawaited futures that have exceptions, need to log warning like Java does -# * Enhanced stack trace? -# * Separate abstract/interface demonstration -# * Reset update randomness seed -# * Confirm thread pool does not leak, meaning thread/worker goes away after last workflow -# * Test workflow cancel causing other cancels at the same time but in different coroutines -# * 0-sleep timers vs nil timers -# * Interceptors -# * Activity -# * Local activity cancel (currently broken) + class LeftoverWaitActivity < Temporalio::Activity::Definition + def execute + ctx = Temporalio::Activity::Context.current + ctx.client.workflow_handle(ctx.info.workflow_id).signal(LeftoverWaitWorkflow.some_signal) + end + end + + class LeftoverWaitWorkflow < Temporalio::Workflow::Definition + def execute + # The issue this is testing is that a wait condition is still set as the fiber to update even after it has + # timed out + + # First have a wait condition and raise from it on timeout + Temporalio::Workflow.timeout(0.001) do + Temporalio::Workflow.wait_condition { @got_signal } + raise 'Unreachable' + rescue Timeout::Error + # Do nothing + end + + # Now execute an activity that is going to send a signal to wake up the wait condition (that was supposed to be + # gone) causing it to give an improper value to the activity here + Temporalio::Workflow.execute_activity(LeftoverWaitActivity, start_to_close_timeout: 10) + end + + workflow_signal + def some_signal + @got_signal = true + end + end + + def test_leftover_wait + handle = execute_workflow(LeftoverWaitWorkflow, activities: [LeftoverWaitActivity]) do |handle| + handle.tap(&:result) + end + # This used to fail because of a workflow task failure caused by a leftover wait condition + assert_nil handle.fetch_history_events.find(&:workflow_task_failed_event_attributes) + end +end