diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6ae12e28..c3e64c53 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..f4f437c6 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -67,7 +67,12 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready else - block + case job_class.concurrency_on_conflict + when :discard + discard_on_conflict + else + block + end end end @@ -104,6 +109,10 @@ def ready ReadyExecution.create_or_find_by!(job_id: id) end + def discard_on_conflict + finished! + end + def execution %w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") } end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 0ea290f6..28079248 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -12,14 +12,16 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period + class_attribute :concurrency_on_conflict, default: :block end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration + self.concurrency_on_conflict = on_conflict end end diff --git a/test/dummy/app/jobs/discard_on_conflict_job.rb b/test/dummy/app/jobs/discard_on_conflict_job.rb new file mode 100644 index 00000000..64da39b3 --- /dev/null +++ b/test/dummy/app/jobs/discard_on_conflict_job.rb @@ -0,0 +1,7 @@ +class DiscardOnConflictJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard + + def perform(value) + Rails.logger.info "Performing DiscardOnConflictJob with value: #{value}" + end +end diff --git a/test/dummy/app/jobs/limited_discard_job.rb b/test/dummy/app/jobs/limited_discard_job.rb new file mode 100644 index 00000000..ee7174c2 --- /dev/null +++ b/test/dummy/app/jobs/limited_discard_job.rb @@ -0,0 +1,8 @@ +class LimitedDiscardJob < ApplicationJob + limits_concurrency to: 2, key: ->(group, id) { group }, on_conflict: :discard + + def perform(group, id) + Rails.logger.info "Performing LimitedDiscardJob with group: #{group}, id: #{id}" + sleep 0.1 + end +end diff --git a/test/integration/concurrency_discard_test.rb b/test/integration/concurrency_discard_test.rb new file mode 100644 index 00000000..1ced7d4d --- /dev/null +++ b/test/integration/concurrency_discard_test.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +require "test_helper" + +class ConcurrencyDiscardTest < ActiveSupport::TestCase + setup do + @job_result = JobResult.create!(queue_name: "default", status: "test") + end + + test "discard jobs when concurrency limit is reached with on_conflict: :discard" do + # Enqueue first job - should be executed + job1 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue second job - should be discarded due to concurrency limit + job2 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue third job - should also be discarded + job3 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Check that first job was ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + assert solid_job1.ready? + assert solid_job1.ready_execution.present? + + # Check that second and third jobs were discarded + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.finished? + assert_nil solid_job2.ready_execution + assert_nil solid_job2.blocked_execution + + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.finished? + assert_nil solid_job3.ready_execution + assert_nil solid_job3.blocked_execution + end + + test "block jobs when concurrency limit is reached without on_conflict option" do + # Using SequentialUpdateResultJob which has default blocking behavior + # Enqueue first job - should be executed + job1 = SequentialUpdateResultJob.perform_later(@job_result, name: "A") + + # Enqueue second job - should be blocked due to concurrency limit + job2 = SequentialUpdateResultJob.perform_later(@job_result, name: "B") + + # Check that second job is blocked + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.blocked? + assert solid_job2.blocked_execution.present? + end + + test "respect concurrency limit with discard option" do + # Enqueue jobs with limit of 2 + job1 = LimitedDiscardJob.perform_later("group1", 1) + job2 = LimitedDiscardJob.perform_later("group1", 2) + job3 = LimitedDiscardJob.perform_later("group1", 3) # Should be discarded + job4 = LimitedDiscardJob.perform_later("group1", 4) # Should be discarded + + # Check that first two jobs are ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job1.ready? + assert solid_job2.ready? + + # Check that third and fourth jobs are discarded + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + solid_job4 = SolidQueue::Job.find_by(active_job_id: job4.job_id) + assert solid_job3.finished? + assert solid_job4.finished? + assert_nil solid_job3.ready_execution + assert_nil solid_job4.ready_execution + end + + test "discard option works with different concurrency keys" do + # These should not conflict because they have different keys + job1 = DiscardOnConflictJob.perform_later("key1") + job2 = DiscardOnConflictJob.perform_later("key2") + job3 = DiscardOnConflictJob.perform_later("key1") # Should be discarded + + # Check that first two jobs are ready (different keys) + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job1.ready? + assert solid_job2.ready? + + # Check that third job is discarded (same key as first) + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.finished? + assert_nil solid_job3.ready_execution + end + + test "discarded jobs do not unblock other jobs" do + # Enqueue a job that will be executed + job1 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue a job that will be discarded + job2 = DiscardOnConflictJob.perform_later(@job_result.id) + + # The first job should be ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + assert solid_job1.ready? + + # The second job should be discarded immediately + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.finished? + + # Complete the first job and release its lock + solid_job1.unblock_next_blocked_job + solid_job1.finished! + + # Enqueue another job - it should be ready since the lock is released + job3 = DiscardOnConflictJob.perform_later(@job_result.id) + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.ready? + end + + test "discarded jobs are marked as finished without execution" do + # Enqueue a job that will be ready + job1 = DiscardOnConflictJob.perform_later("test_key") + + # Enqueue a job that will be discarded + job2 = DiscardOnConflictJob.perform_later("test_key") + + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + + # First job should be ready + assert solid_job1.ready? + assert solid_job1.ready_execution.present? + + # Second job should be finished without any execution + assert solid_job2.finished? + assert_nil solid_job2.ready_execution + assert_nil solid_job2.claimed_execution + assert_nil solid_job2.failed_execution + assert_nil solid_job2.blocked_execution + end +end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 486756ab..6d87c6b1 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -18,6 +18,13 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end + class NonOverlappingDiscardJob < ApplicationJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard + + def perform(job_result) + end + end + setup do @result = JobResult.create!(queue_name: "default") end @@ -45,6 +52,82 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal 8, execution.priority end + test "enqueue jobs with on_conflict discard" do + # First job should be ready + active_job1 = NonOverlappingDiscardJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(active_job1) + end + job1 = SolidQueue::Job.find_by(active_job_id: active_job1.job_id) + assert job1.ready? + + # Second job should be discarded (finished without execution) + active_job2 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + SolidQueue::Job.enqueue(active_job2) + end + end + job2 = SolidQueue::Job.find_by(active_job_id: active_job2.job_id) + + assert job2.finished? + assert_nil job2.ready_execution + assert_nil job2.blocked_execution + assert_nil job2.claimed_execution + assert_nil job2.failed_execution + + # Third job with same key should also be discarded + active_job3 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + SolidQueue::Job.enqueue(active_job3) + end + job3 = SolidQueue::Job.find_by(active_job_id: active_job3.job_id) + + assert job3.finished? + end + + test "compare blocking vs discard behavior" do + # Test default blocking behavior + blocking_job1 = NonOverlappingJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(blocking_job1) + end + job1 = SolidQueue::Job.find_by(active_job_id: blocking_job1.job_id) + assert job1.ready? + + # Second job should be blocked (not discarded) + blocking_job2 = NonOverlappingJob.new(@result) + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + SolidQueue::Job.enqueue(blocking_job2) + end + job2 = SolidQueue::Job.find_by(active_job_id: blocking_job2.job_id) + assert job2.blocked? + assert job2.blocked_execution.present? + assert_not job2.finished? + + # Clean up for discard test + SolidQueue::Job.destroy_all + SolidQueue::Semaphore.destroy_all + + # Test discard behavior + discard_job1 = NonOverlappingDiscardJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(discard_job1) + end + job3 = SolidQueue::Job.find_by(active_job_id: discard_job1.job_id) + assert job3.ready? + + # Second job should be discarded (not blocked) + discard_job2 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + SolidQueue::Job.enqueue(discard_job2) + end + job4 = SolidQueue::Job.find_by(active_job_id: discard_job2.job_id) + assert job4.finished? + assert_nil job4.blocked_execution + assert_nil job4.ready_execution + end + test "enqueue active job to be scheduled in the future" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") diff --git a/test/unit/concurrency_discard_test.rb b/test/unit/concurrency_discard_test.rb new file mode 100644 index 00000000..ae31fa55 --- /dev/null +++ b/test/unit/concurrency_discard_test.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +require "test_helper" + +module SolidQueue + class ConcurrencyDiscardTest < ActiveSupport::TestCase + class DiscardOnConflictJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard + + def perform(value) + # Job implementation + end + end + + class DefaultBlockingJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value } + + def perform(value) + # Job implementation + end + end + + test "job with on_conflict: :discard is finished when concurrency limit is reached" do + # Create first job that will acquire the lock + active_job1 = DiscardOnConflictJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # First job should be ready + assert job1.ready? + assert job1.ready_execution.present? + + # Create second job that should be discarded + active_job2 = DiscardOnConflictJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + # Second job should be finished without any execution + assert job2.finished? + assert_nil job2.ready_execution + assert_nil job2.blocked_execution + assert_nil job2.claimed_execution + assert_nil job2.failed_execution + end + + test "job without on_conflict option is blocked when concurrency limit is reached" do + # Create first job that will acquire the lock + active_job1 = DefaultBlockingJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # First job should be ready + assert job1.ready? + assert job1.ready_execution.present? + + # Create second job that should be blocked + active_job2 = DefaultBlockingJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + # Second job should be blocked + assert job2.blocked? + assert job2.blocked_execution.present? + assert_nil job2.ready_execution + assert_not job2.finished? + end + + test "concurrency_on_conflict attribute is properly set" do + assert_equal :discard, DiscardOnConflictJob.concurrency_on_conflict + assert_equal :block, DefaultBlockingJob.concurrency_on_conflict + end + + test "multiple jobs with same key are discarded when using on_conflict: :discard" do + # Create first job + active_job1 = DiscardOnConflictJob.new("shared_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # Create multiple jobs that should all be discarded + discarded_jobs = [] + 5.times do |i| + active_job = DiscardOnConflictJob.new("shared_key") + active_job.job_id = "job#{i + 2}" + Job.enqueue(active_job) + job = Job.find_by(active_job_id: active_job.job_id) + discarded_jobs << job + end + + # First job should be ready + assert job1.ready? + + # All other jobs should be finished (discarded) + discarded_jobs.each do |job| + assert job.finished? + assert_nil job.ready_execution + assert_nil job.blocked_execution + end + end + + test "jobs with different keys are not affected by discard" do + # Create jobs with different keys - they should all be ready + jobs = [] + 3.times do |i| + active_job = DiscardOnConflictJob.new("key_#{i}") + active_job.job_id = "job#{i}" + Job.enqueue(active_job) + job = Job.find_by(active_job_id: active_job.job_id) + jobs << job + end + + # All jobs should be ready since they have different keys + jobs.each do |job| + assert job.ready? + assert job.ready_execution.present? + assert_not job.finished? + end + end + + test "discarded job does not prevent future jobs after lock is released" do + # Create and finish first job + active_job1 = DiscardOnConflictJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # Create second job that gets discarded + active_job2 = DiscardOnConflictJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + assert job1.ready? + assert job2.finished? # discarded + + # Release the lock by finishing the first job + job1.unblock_next_blocked_job + job1.finished! + + # Create third job - should be ready now + active_job3 = DiscardOnConflictJob.new("test_key") + active_job3.job_id = "job3" + Job.enqueue(active_job3) + job3 = Job.find_by(active_job_id: active_job3.job_id) + + assert job3.ready? + assert job3.ready_execution.present? + end + end +end