Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def apply(job)
when :initialize_workflow
# Ignore
when :fire_timer
pending_timers.delete(job.fire_timer.seq)&.resume
pending_timers[job.fire_timer.seq]&.resume
when :update_random_seed
@random = illegal_call_tracing_disabled { Random.new(job.update_random_seed.randomness_seed) }
when :query_workflow
Expand All @@ -356,23 +356,23 @@ def apply(job)
when :signal_workflow
apply_signal(job.signal_workflow)
when :resolve_activity
pending_activities.delete(job.resolve_activity.seq)&.resume(job.resolve_activity.result)
pending_activities[job.resolve_activity.seq]&.resume(job.resolve_activity.result)
when :notify_has_patch
@patches_notified << job.notify_has_patch.patch_id
when :resolve_child_workflow_execution_start
pending_child_workflow_starts.delete(job.resolve_child_workflow_execution_start.seq)&.resume(
pending_child_workflow_starts[job.resolve_child_workflow_execution_start.seq]&.resume(
job.resolve_child_workflow_execution_start
)
when :resolve_child_workflow_execution
pending_child_workflows.delete(job.resolve_child_workflow_execution.seq)&._resolve(
pending_child_workflows[job.resolve_child_workflow_execution.seq]&._resolve(
job.resolve_child_workflow_execution.result
)
when :resolve_signal_external_workflow
pending_external_signals.delete(job.resolve_signal_external_workflow.seq)&.resume(
pending_external_signals[job.resolve_signal_external_workflow.seq]&.resume(
job.resolve_signal_external_workflow
)
when :resolve_request_cancel_external_workflow
pending_external_cancels.delete(job.resolve_request_cancel_external_workflow.seq)&.resume(
pending_external_cancels[job.resolve_request_cancel_external_workflow.seq]&.resume(
job.resolve_request_cancel_external_workflow
)
when :do_update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ def cancel_external_workflow(input)
@instance.pending_external_cancels[seq] = Fiber.current

# Wait
resolution = Fiber.yield
resolution = begin
Fiber.yield
ensure
# Remove pending
@instance.pending_external_cancels.delete(seq)
end

# Raise if resolution has failure
return unless resolution.failure
Expand Down Expand Up @@ -169,10 +174,13 @@ def execute_activity_once(local:, cancellation:, last_local_backoff:, result_hin
end

# Wait
resolution = Fiber.yield

# Remove cancellation callback
cancellation.remove_cancel_callback(cancel_callback_key)
resolution = begin
Fiber.yield
ensure
# Remove pending and cancel callback
@instance.pending_activities.delete(seq)
cancellation.remove_cancel_callback(cancel_callback_key)
end

case resolution.status
when :completed
Expand Down Expand Up @@ -254,10 +262,13 @@ def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation
end

# Wait
resolution = Fiber.yield

# Remove cancellation callback
cancellation.remove_cancel_callback(cancel_callback_key)
resolution = begin
Fiber.yield
ensure
# Remove pending and cancel callback
@instance.pending_external_signals.delete(seq)
cancellation.remove_cancel_callback(cancel_callback_key)
end

# Raise if resolution has failure
return unless resolution.failure
Expand Down Expand Up @@ -317,7 +328,12 @@ def sleep(input)
end

# Wait
Fiber.yield
begin
Fiber.yield
ensure
# Remove pending
@instance.pending_timers.delete(seq)
end

# Remove cancellation callback (only needed on success)
input.cancellation.remove_cancel_callback(cancel_callback_key)
Expand Down Expand Up @@ -374,7 +390,12 @@ def start_child_workflow(input)
end

# Wait for start
resolution = Fiber.yield
resolution = begin
Fiber.yield
ensure
# Remove pending
@instance.pending_child_workflow_starts.delete(seq)
end

case resolution.status
when :succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ def wait_condition(cancellation:, &block)
end

# This blocks until a resume is called on this fiber
result = Fiber.yield
result = begin
Fiber.yield
ensure
# Remove pending
@wait_conditions.delete(seq)
end

# Remove cancellation callback (only needed on success)
cancellation&.remove_cancel_callback(cancel_callback_key) if cancel_callback_key
Expand Down
53 changes: 39 additions & 14 deletions temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2685,18 +2685,43 @@ def test_last_failure
assert_equal 'Intentional failure', previous_failure
end
end
end

# TODO(cretz): To test
# * Common
# * Eager workflow start
# * Unawaited futures that have exceptions, need to log warning like Java does
# * Enhanced stack trace?
# * Separate abstract/interface demonstration
# * Reset update randomness seed
# * Confirm thread pool does not leak, meaning thread/worker goes away after last workflow
# * Test workflow cancel causing other cancels at the same time but in different coroutines
# * 0-sleep timers vs nil timers
# * Interceptors
# * Activity
# * Local activity cancel (currently broken)
class LeftoverWaitActivity < Temporalio::Activity::Definition
def execute
ctx = Temporalio::Activity::Context.current
ctx.client.workflow_handle(ctx.info.workflow_id).signal(LeftoverWaitWorkflow.some_signal)
end
end

class LeftoverWaitWorkflow < Temporalio::Workflow::Definition
def execute
# The issue this is testing is that a wait condition is still set as the fiber to update even after it has
# timed out

# First have a wait condition and raise from it on timeout
Temporalio::Workflow.timeout(0.001) do
Temporalio::Workflow.wait_condition { @got_signal }
raise 'Unreachable'
rescue Timeout::Error
# Do nothing
end

# Now execute an activity that is going to send a signal to wake up the wait condition (that was supposed to be
# gone) causing it to give an improper value to the activity here
Temporalio::Workflow.execute_activity(LeftoverWaitActivity, start_to_close_timeout: 10)
end

workflow_signal
def some_signal
@got_signal = true
end
end

def test_leftover_wait
handle = execute_workflow(LeftoverWaitWorkflow, activities: [LeftoverWaitActivity]) do |handle|
handle.tap(&:result)
end
# This used to fail because of a workflow task failure caused by a leftover wait condition
assert_nil handle.fetch_history_events.find(&:workflow_task_failed_event_attributes)
end
end
Loading