From c2ed289386dc90b502911b65cd67d36f68ddc6a5 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Thu, 1 Feb 2024 20:51:29 -0500 Subject: [PATCH 01/23] Batch job POC * Introduces a "batch" concept, similar to batches present in Sidekiq Pro and GoodJob * Batches monitor a set of jobs, and when those jobs are completed can fire off a final job * This introduces a SolidQueue::JobBatch model, as well as the ability to enqueue jobs and associate them with the batch * There are still more ideas to figure out, but this provides a basic batch scaffolding to spark discussion --- README.md | 11 +++ app/models/solid_queue/claimed_execution.rb | 2 + app/models/solid_queue/job.rb | 6 +- app/models/solid_queue/job/executable.rb | 2 +- app/models/solid_queue/job_batch.rb | 96 +++++++++++++++++++ ...31013203_create_solid_queue_batch_table.rb | 21 ++++ lib/active_job/job_batch_id.rb | 22 +++++ lib/solid_queue.rb | 1 + lib/solid_queue/dispatcher.rb | 1 + lib/solid_queue/engine.rb | 1 + test/dummy/app/jobs/batch_completion_job.rb | 7 ++ test/dummy/app/jobs/sleepy_job.rb | 10 ++ test/models/solid_queue/job_batch_test.rb | 48 ++++++++++ 13 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 app/models/solid_queue/job_batch.rb create mode 100644 db/migrate/20240131013203_create_solid_queue_batch_table.rb create mode 100644 lib/active_job/job_batch_id.rb create mode 100644 test/dummy/app/jobs/batch_completion_job.rb create mode 100644 test/dummy/app/jobs/sleepy_job.rb create mode 100644 test/models/solid_queue/job_batch_test.rb diff --git a/README.md b/README.md index 92a018d4..76fd7b9a 100644 --- a/README.md +++ b/README.md @@ -584,6 +584,17 @@ class ApplicationMailer < ActionMailer::Base Rails.error.report(exception) raise exception end +``` + +## Batch jobs + +```rb +SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do + 5.times.map { |i| SleepyJob.perform_later(i) } +end + +SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) do + 5.times.map { |i| SleepyJob.perform_later(i) } end ``` diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 8840505b..c9726b5f 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -69,6 +69,8 @@ def perform failed_with(result.error) raise result.error end + + job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present? ensure unblock_next_job end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 6a6a6fa3..8a5927ec 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -8,6 +8,8 @@ class EnqueueError < StandardError; end serialize :arguments, coder: JSON + belongs_to :job_batch, foreign_key: :batch_id, optional: true + class << self def enqueue_all(active_jobs) active_jobs_by_job_id = active_jobs.index_by(&:job_id) @@ -54,6 +56,7 @@ def create_all_from_active_jobs(active_jobs) end def attributes_from_active_job(active_job) + active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id { queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME, active_job_id: active_job.job_id, @@ -61,7 +64,8 @@ def attributes_from_active_job(active_job) scheduled_at: active_job.scheduled_at, class_name: active_job.class.name, arguments: active_job.serialize, - concurrency_key: active_job.concurrency_key + concurrency_key: active_job.concurrency_key, + batch_id: active_job.batch_id } end end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index b0a4cb93..08f07bb0 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -76,7 +76,7 @@ def dispatch_bypassing_concurrency_limits end def finished! - if SolidQueue.preserve_finished_jobs? + if SolidQueue.preserve_finished_jobs? || batch_id.present? touch(:finished_at) else destroy! diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb new file mode 100644 index 00000000..58bcee21 --- /dev/null +++ b/app/models/solid_queue/job_batch.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module SolidQueue + class JobBatch < Record + belongs_to :job, foreign_key: :job_id, optional: true + has_many :jobs, foreign_key: :batch_id + + scope :incomplete, -> { + where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) + } + + class << self + def current_batch_id + Thread.current[:current_batch_id] + end + + def enqueue(attributes = {}) + previous_batch_id = current_batch_id.presence || nil + + job_batch = nil + transaction do + job_batch = create!(batch_attributes(attributes)) + Thread.current[:current_batch_id] = job_batch.id + yield + end + + job_batch + ensure + Thread.current[:current_batch_id] = previous_batch_id + end + + def dispatch_finished_batches + incomplete.order(:id).pluck(:id).each do |id| + transaction do + where(id:).non_blocking_lock.each(&:finish) + end + end + end + + private + + def batch_attributes(attributes) + attributes = case attributes + in { on_finish: on_finish_klass } + attributes.merge( + job_class: on_finish_klass, + completion_type: "success" + ) + in { on_success: on_success_klass } + attributes.merge( + job_class: on_success_klass, + completion_type: "success" + ) + end + + attributes.except(:on_finish, :on_success) + end + end + + def finished? + finished_at.present? + end + + def finish + return if finished? + reset_changed_at + jobs.find_each do |next_job| + # FIXME: If it's failed but is going to retry, how do we know? + # Because we need to know if we will determine what the failed execution means + # FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine + # how to analyze each job + return unless next_job.finished? + end + + attrs = {} + + if job_class.present? + job_klass = job_class.constantize + active_job = job_klass.perform_later(self) + attrs[:job] = Job.find_by(active_job_id: active_job.job_id) + end + + update!({ finished_at: Time.zone.now }.merge(attrs)) + end + + private + + def reset_changed_at + if changed_at.blank? && last_changed_at.present? + update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again + else + update_columns(changed_at: nil) # clear out changed_at so we ignore this until the next job finishes + end + end + end +end diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb new file mode 100644 index 00000000..bf8d97ce --- /dev/null +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -0,0 +1,21 @@ +class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_job_batches do |t| + t.references :job, index: { unique: true } + t.string :job_class + t.string :completion_type + t.datetime :finished_at + t.datetime :changed_at + t.datetime :last_changed_at + t.timestamps + + t.index [ :finished_at ] + t.index [ :changed_at ] + t.index [ :last_changed_at ] + end + + add_reference :solid_queue_jobs, :batch, index: true + add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade + add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id + end +end diff --git a/lib/active_job/job_batch_id.rb b/lib/active_job/job_batch_id.rb new file mode 100644 index 00000000..5810d152 --- /dev/null +++ b/lib/active_job/job_batch_id.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +# Inspired by active_job/core.rb docs +# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136 +module ActiveJob + module JobBatchId + extend ActiveSupport::Concern + + included do + attr_accessor :batch_id + end + + def serialize + super.merge('batch_id' => batch_id) + end + + def deserialize(job_data) + super + self.batch_id = job_data['batch_id'] + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..1277ea67 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -5,6 +5,7 @@ require "active_job" require "active_job/queue_adapters" +require "active_job/job_batch_id" require "active_support" require "active_support/core_ext/numeric/time" diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 1583e1dd..5bcbe0e8 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -37,6 +37,7 @@ def poll def dispatch_next_batch with_polling_volume do ScheduledExecution.dispatch_next_batch(batch_size) + SolidQueue::JobBatch.dispatch_finished_batches end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..452ae445 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -35,6 +35,7 @@ class Engine < ::Rails::Engine initializer "solid_queue.active_job.extensions" do ActiveSupport.on_load :active_job do include ActiveJob::ConcurrencyControls + include ActiveJob::JobBatchId end end end diff --git a/test/dummy/app/jobs/batch_completion_job.rb b/test/dummy/app/jobs/batch_completion_job.rb new file mode 100644 index 00000000..0fb17284 --- /dev/null +++ b/test/dummy/app/jobs/batch_completion_job.rb @@ -0,0 +1,7 @@ +class BatchCompletionJob < ApplicationJob + queue_as :background + + def perform(batch) + Rails.logger.info "#{batch.jobs.size} jobs completed!" + end +end diff --git a/test/dummy/app/jobs/sleepy_job.rb b/test/dummy/app/jobs/sleepy_job.rb new file mode 100644 index 00000000..dd105cdc --- /dev/null +++ b/test/dummy/app/jobs/sleepy_job.rb @@ -0,0 +1,10 @@ +class SleepyJob < ApplicationJob + queue_as :background + + retry_on Exception, wait: 30.seconds, attempts: 5 + + def perform(seconds_to_sleep) + Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..." + sleep seconds_to_sleep + end +end diff --git a/test/models/solid_queue/job_batch_test.rb b/test/models/solid_queue/job_batch_test.rb new file mode 100644 index 00000000..962904e8 --- /dev/null +++ b/test/models/solid_queue/job_batch_test.rb @@ -0,0 +1,48 @@ +require "test_helper" + +class SolidQueue::JobBatchTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + teardown do + SolidQueue::Job.destroy_all + SolidQueue::JobBatch.destroy_all + end + + class NiceJob < ApplicationJob + retry_on Exception, wait: 1.second + + def perform(arg) + Rails.logger.info "Hi #{arg}!" + end + end + + test "batch will be completed on success" do + batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) {} + assert_equal "success", batch.completion_type + assert_equal BatchCompletionJob.name, batch.job_class + end + + test "batch will be completed on finish" do + batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) {} + assert_equal "success", batch.completion_type + assert_equal BatchCompletionJob.name, batch.job_class + end + + test "sets the batch_id on jobs created inside of the enqueue block" do + batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do + NiceJob.perform_later("world") + NiceJob.perform_later("people") + end + + assert_equal 2, SolidQueue::Job.count + assert_equal [batch.id] * 2, SolidQueue::Job.last(2).map(&:batch_id) + end + + test "batch id is present inside the block" do + assert_nil SolidQueue::JobBatch.current_batch_id + SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do + assert_not_nil SolidQueue::JobBatch.current_batch_id + end + assert_nil SolidQueue::JobBatch.current_batch_id + end +end From 9e2e61c07a766665bf5a3e1ed06af6eef5cec5fc Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:17:31 -0500 Subject: [PATCH 02/23] Use ActiveSupport::IsolatedExecutionState to honor user isolation level setting --- app/models/solid_queue/job_batch.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 58bcee21..fb281fbb 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -11,7 +11,7 @@ class JobBatch < Record class << self def current_batch_id - Thread.current[:current_batch_id] + ActiveSupport::IsolatedExecutionState[:current_batch_id] end def enqueue(attributes = {}) @@ -20,13 +20,13 @@ def enqueue(attributes = {}) job_batch = nil transaction do job_batch = create!(batch_attributes(attributes)) - Thread.current[:current_batch_id] = job_batch.id + ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id yield end job_batch ensure - Thread.current[:current_batch_id] = previous_batch_id + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id end def dispatch_finished_batches From 27a084cd2762a7f9a089c5f950e6a9f4a027cf84 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:18:03 -0500 Subject: [PATCH 03/23] Ability to retrieve batch from a job --- lib/active_job/job_batch_id.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/active_job/job_batch_id.rb b/lib/active_job/job_batch_id.rb index 5810d152..fc6978a6 100644 --- a/lib/active_job/job_batch_id.rb +++ b/lib/active_job/job_batch_id.rb @@ -18,5 +18,9 @@ def deserialize(job_data) super self.batch_id = job_data['batch_id'] end + + def batch + @batch ||= SolidQueue::JobBatch.find_by(id: batch_id) + end end end From 58b1d2e8fbcac10fb3903d9aa3b97f1f05cd7ce0 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 7 Feb 2024 22:06:21 -0500 Subject: [PATCH 04/23] Allow batch jobs to be instances * This means we can store the arguments and settings by letting the user do `BatchJob.new(arguments).set(options)` * Yield the batch in `enqueue` in case someone needs info from it * When you serialize then deserialize an activejob instance, the arguments are in the serialized_arguments field and can only be transferred over by the private method `deserialize_arguments_if_needed`. This is pretty janky, so there is probably something i'm missing * `perform_all_later` let's us do a perform_later even with instance, which does not seem to be possible on the instances themselves --- app/models/solid_queue/job_batch.rb | 38 +++++++++++-------- ...31013203_create_solid_queue_batch_table.rb | 4 +- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index fb281fbb..96e449b3 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -5,6 +5,9 @@ class JobBatch < Record belongs_to :job, foreign_key: :job_id, optional: true has_many :jobs, foreign_key: :batch_id + serialize :on_finish_active_job, coder: JSON + serialize :on_success_active_job, coder: JSON + scope :incomplete, -> { where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) } @@ -21,7 +24,7 @@ def enqueue(attributes = {}) transaction do job_batch = create!(batch_attributes(attributes)) ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id - yield + yield job_batch end job_batch @@ -40,20 +43,22 @@ def dispatch_finished_batches private def batch_attributes(attributes) - attributes = case attributes - in { on_finish: on_finish_klass } - attributes.merge( - job_class: on_finish_klass, - completion_type: "success" - ) - in { on_success: on_success_klass } - attributes.merge( - job_class: on_success_klass, - completion_type: "success" - ) + on_finish_klass = attributes.delete(:on_finish) + on_success_klass = attributes.delete(:on_success) + + if on_finish_klass.present? + attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize + end + + if on_success_klass.present? + attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize end - attributes.except(:on_finish, :on_success) + attributes + end + + def as_active_job(active_job_klass) + active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new end end @@ -74,9 +79,10 @@ def finish attrs = {} - if job_class.present? - job_klass = job_class.constantize - active_job = job_klass.perform_later(self) + if on_finish_active_job.present? + active_job = ActiveJob::Base.deserialize(on_finish_active_job) + active_job.send(:deserialize_arguments_if_needed) + ActiveJob.perform_all_later([active_job]) attrs[:job] = Job.find_by(active_job_id: active_job.job_id) end diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb index bf8d97ce..26540b9c 100644 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -2,8 +2,8 @@ class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] def change create_table :solid_queue_job_batches do |t| t.references :job, index: { unique: true } - t.string :job_class - t.string :completion_type + t.string :on_finish_active_job + t.string :on_success_active_job t.datetime :finished_at t.datetime :changed_at t.datetime :last_changed_at From 7a58278d20516ba95f672500cfb20a0e515e182d Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 7 Feb 2024 22:16:28 -0500 Subject: [PATCH 05/23] Make sure `batch` is still first arg of the batch callback * Add spec for adding arguments and options to the batch callback --- app/models/solid_queue/job_batch.rb | 1 + test/models/solid_queue/job_batch_test.rb | 26 +++++++++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 96e449b3..a5099731 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -82,6 +82,7 @@ def finish if on_finish_active_job.present? active_job = ActiveJob::Base.deserialize(on_finish_active_job) active_job.send(:deserialize_arguments_if_needed) + active_job.arguments = [self] + Array.wrap(active_job.arguments) ActiveJob.perform_all_later([active_job]) attrs[:job] = Job.find_by(active_job_id: active_job.job_id) end diff --git a/test/models/solid_queue/job_batch_test.rb b/test/models/solid_queue/job_batch_test.rb index 962904e8..30684caf 100644 --- a/test/models/solid_queue/job_batch_test.rb +++ b/test/models/solid_queue/job_batch_test.rb @@ -8,6 +8,12 @@ class SolidQueue::JobBatchTest < ActiveSupport::TestCase SolidQueue::JobBatch.destroy_all end + class BatchWithArgumentsJob < ApplicationJob + def perform(batch, arg1, arg2) + Rails.logger.info "Hi #{batch.id}, #{arg1}, #{arg2}!" + end + end + class NiceJob < ApplicationJob retry_on Exception, wait: 1.second @@ -18,14 +24,14 @@ def perform(arg) test "batch will be completed on success" do batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) {} - assert_equal "success", batch.completion_type - assert_equal BatchCompletionJob.name, batch.job_class + assert_not_nil batch.on_finish_active_job + assert_equal BatchCompletionJob.name, batch.on_finish_active_job["job_class"] end test "batch will be completed on finish" do batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) {} - assert_equal "success", batch.completion_type - assert_equal BatchCompletionJob.name, batch.job_class + assert_not_nil batch.on_success_active_job + assert_equal BatchCompletionJob.name, batch.on_success_active_job["job_class"] end test "sets the batch_id on jobs created inside of the enqueue block" do @@ -45,4 +51,16 @@ def perform(arg) end assert_nil SolidQueue::JobBatch.current_batch_id end + + test "allow arguments and options for callbacks" do + SolidQueue::JobBatch.enqueue( + on_finish: BatchWithArgumentsJob.new(1, 2).set(queue: :batch), + ) do + NiceJob.perform_later("world") + end + + assert_not_nil SolidQueue::JobBatch.last.on_finish_active_job["arguments"] + assert_equal SolidQueue::JobBatch.last.on_finish_active_job["arguments"], [1, 2] + assert_equal SolidQueue::JobBatch.last.on_finish_active_job["queue_name"], "batch" + end end From 3870f29b8ebdee5690ee809fb41e0f5ce6d4dafb Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Fri, 22 Mar 2024 20:22:25 -0400 Subject: [PATCH 06/23] Use text so the jobs store properly on mysql --- db/migrate/20240131013203_create_solid_queue_batch_table.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb index 26540b9c..8e9e79af 100644 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -2,8 +2,8 @@ class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] def change create_table :solid_queue_job_batches do |t| t.references :job, index: { unique: true } - t.string :on_finish_active_job - t.string :on_success_active_job + t.text :on_finish_active_job + t.text :on_success_active_job t.datetime :finished_at t.datetime :changed_at t.datetime :last_changed_at From 359fd65df329b21db8fbdff44ed04de3eb3b9661 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:08:11 -0400 Subject: [PATCH 07/23] Rubocop changes * Spacing, double quotes * Support Ruby < 3.2 by removing the implicit key/variable syntax --- app/models/solid_queue/job_batch.rb | 6 +++--- lib/active_job/job_batch_id.rb | 4 ++-- test/models/solid_queue/job_batch_test.rb | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index a5099731..3d0de10d 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -35,7 +35,7 @@ def enqueue(attributes = {}) def dispatch_finished_batches incomplete.order(:id).pluck(:id).each do |id| transaction do - where(id:).non_blocking_lock.each(&:finish) + where(id: id).non_blocking_lock.each(&:finish) end end end @@ -82,8 +82,8 @@ def finish if on_finish_active_job.present? active_job = ActiveJob::Base.deserialize(on_finish_active_job) active_job.send(:deserialize_arguments_if_needed) - active_job.arguments = [self] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([active_job]) + active_job.arguments = [ self ] + Array.wrap(active_job.arguments) + ActiveJob.perform_all_later([ active_job ]) attrs[:job] = Job.find_by(active_job_id: active_job.job_id) end diff --git a/lib/active_job/job_batch_id.rb b/lib/active_job/job_batch_id.rb index fc6978a6..494e197f 100644 --- a/lib/active_job/job_batch_id.rb +++ b/lib/active_job/job_batch_id.rb @@ -11,12 +11,12 @@ module JobBatchId end def serialize - super.merge('batch_id' => batch_id) + super.merge("batch_id" => batch_id) end def deserialize(job_data) super - self.batch_id = job_data['batch_id'] + self.batch_id = job_data["batch_id"] end def batch diff --git a/test/models/solid_queue/job_batch_test.rb b/test/models/solid_queue/job_batch_test.rb index 30684caf..e49f59c2 100644 --- a/test/models/solid_queue/job_batch_test.rb +++ b/test/models/solid_queue/job_batch_test.rb @@ -23,13 +23,13 @@ def perform(arg) end test "batch will be completed on success" do - batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) {} + batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) { } assert_not_nil batch.on_finish_active_job assert_equal BatchCompletionJob.name, batch.on_finish_active_job["job_class"] end test "batch will be completed on finish" do - batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) {} + batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) { } assert_not_nil batch.on_success_active_job assert_equal BatchCompletionJob.name, batch.on_success_active_job["job_class"] end @@ -41,7 +41,7 @@ def perform(arg) end assert_equal 2, SolidQueue::Job.count - assert_equal [batch.id] * 2, SolidQueue::Job.last(2).map(&:batch_id) + assert_equal [ batch.id ] * 2, SolidQueue::Job.last(2).map(&:batch_id) end test "batch id is present inside the block" do @@ -60,7 +60,7 @@ def perform(arg) end assert_not_nil SolidQueue::JobBatch.last.on_finish_active_job["arguments"] - assert_equal SolidQueue::JobBatch.last.on_finish_active_job["arguments"], [1, 2] + assert_equal SolidQueue::JobBatch.last.on_finish_active_job["arguments"], [ 1, 2 ] assert_equal SolidQueue::JobBatch.last.on_finish_active_job["queue_name"], "batch" end end From 0e2d85edb51fdd6ca0471eda5119d0371a43f6c2 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 23 Sep 2024 22:50:29 -0400 Subject: [PATCH 08/23] Handle on_failure and on_success * on_failure fires the first time any of the jobs fail, even once * on_success only fires if all jobs work (after retries) * remove unused job_id --- app/models/solid_queue/job_batch.rb | 53 ++++++++++++++----- ...31013203_create_solid_queue_batch_table.rb | 3 +- test/test_helpers/jobs_test_helper.rb | 8 +++ 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 3d0de10d..1bac9139 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -7,10 +7,12 @@ class JobBatch < Record serialize :on_finish_active_job, coder: JSON serialize :on_success_active_job, coder: JSON + serialize :on_failure_active_job, coder: JSON scope :incomplete, -> { where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) } + scope :finished, -> { where.not(finished_at: nil) } class << self def current_batch_id @@ -45,6 +47,7 @@ def dispatch_finished_batches def batch_attributes(attributes) on_finish_klass = attributes.delete(:on_finish) on_success_klass = attributes.delete(:on_success) + on_failure_klass = attributes.delete(:on_failure) if on_finish_klass.present? attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize @@ -54,6 +57,10 @@ def batch_attributes(attributes) attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize end + if on_failure_klass.present? + attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize + end + attributes end @@ -69,22 +76,29 @@ def finished? def finish return if finished? reset_changed_at - jobs.find_each do |next_job| - # FIXME: If it's failed but is going to retry, how do we know? - # Because we need to know if we will determine what the failed execution means - # FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine - # how to analyze each job - return unless next_job.finished? - end + all_jobs_succeeded = true attrs = {} + jobs.find_each do |next_job| + # SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished, + # and there is no record of the failure. + # GoodJob would report a discard as an error. It's possible we should do that in the future? + if fire_failure_job?(next_job) + perform_completion_job(:on_failure_active_job, attrs) + update!(attrs) + end + + status = next_job.status + all_jobs_succeeded = all_jobs_succeeded && status != :failed + return unless status.in?([ :finished, :failed ]) + end if on_finish_active_job.present? - active_job = ActiveJob::Base.deserialize(on_finish_active_job) - active_job.send(:deserialize_arguments_if_needed) - active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([ active_job ]) - attrs[:job] = Job.find_by(active_job_id: active_job.job_id) + perform_completion_job(:on_finish_active_job, attrs) + end + + if on_success_active_job.present? && all_jobs_succeeded + perform_completion_job(:on_success_active_job, attrs) end update!({ finished_at: Time.zone.now }.merge(attrs)) @@ -92,6 +106,21 @@ def finish private + def fire_failure_job?(job) + return false if on_failure_active_job.blank? || job.failed_execution.blank? + job = ActiveJob::Base.deserialize(on_failure_active_job) + job.provider_job_id.blank? + end + + def perform_completion_job(job_field, attrs) + active_job = ActiveJob::Base.deserialize(send(job_field)) + active_job.send(:deserialize_arguments_if_needed) + active_job.arguments = [ self ] + Array.wrap(active_job.arguments) + ActiveJob.perform_all_later([ active_job ]) + active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id + attrs[job_field] = active_job.serialize + end + def reset_changed_at if changed_at.blank? && last_changed_at.present? update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb index 8e9e79af..f97faee5 100644 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -1,9 +1,9 @@ class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] def change create_table :solid_queue_job_batches do |t| - t.references :job, index: { unique: true } t.text :on_finish_active_job t.text :on_success_active_job + t.text :on_failure_active_job t.datetime :finished_at t.datetime :changed_at t.datetime :last_changed_at @@ -16,6 +16,5 @@ def change add_reference :solid_queue_jobs, :batch, index: true add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade - add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id end end diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index 8b71e7f6..b000a65d 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -17,6 +17,14 @@ def wait_for_jobs_to_be_released_for(timeout = 1.second) end end + def wait_for_job_batches_to_finish_for(timeout = 1.second) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::JobBatch.where(finished_at: nil).any? + end + end + end + def assert_unfinished_jobs(*jobs) skip_active_record_query_cache do assert_equal jobs.map(&:job_id).sort, SolidQueue::Job.where(finished_at: nil).map(&:active_job_id).sort From 85d6560f7f437f2ca6bc7482da5105537c39a37a Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 23 Sep 2024 22:56:52 -0400 Subject: [PATCH 09/23] Allow enqueueing into a batch instance * Allows enqueueing a job within a job, as part of the batch --- app/models/solid_queue/job_batch.rb | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 1bac9139..1173c01f 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -69,6 +69,20 @@ def as_active_job(active_job_klass) end end + # Instance-level enqueue + def enqueue(attributes = {}) + previous_batch_id = self.class.current_batch_id.presence || nil + + transaction do + ActiveSupport::IsolatedExecutionState[:current_batch_id] = id + yield self + end + + self + ensure + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id + end + def finished? finished_at.present? end From fd8e614c6b8d40283f4ccab81f620d525b8f3b60 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 23 Sep 2024 22:57:56 -0400 Subject: [PATCH 10/23] Block enqueueing if the batch is finished --- app/models/solid_queue/job_batch.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 1173c01f..1a94da03 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -71,6 +71,8 @@ def as_active_job(active_job_klass) # Instance-level enqueue def enqueue(attributes = {}) + raise "You cannot enqueue a batch that is already finished" if finished? + previous_batch_id = self.class.current_batch_id.presence || nil transaction do From d01929461d01705a99aaf367757e575b25db9405 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Mon, 23 Sep 2024 23:03:47 -0400 Subject: [PATCH 11/23] Migration to allow nesting batches --- app/models/solid_queue/job_batch.rb | 1 + db/migrate/20240131013203_create_solid_queue_batch_table.rb | 1 + 2 files changed, 2 insertions(+) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 1a94da03..61780f38 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -3,6 +3,7 @@ module SolidQueue class JobBatch < Record belongs_to :job, foreign_key: :job_id, optional: true + belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true has_many :jobs, foreign_key: :batch_id serialize :on_finish_active_job, coder: JSON diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb index f97faee5..91b76ee8 100644 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -1,6 +1,7 @@ class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] def change create_table :solid_queue_job_batches do |t| + t.references :parent_job_batch, index: true # FIXME: foreign key t.text :on_finish_active_job t.text :on_success_active_job t.text :on_failure_active_job From 1a62a2ba4bfb124a9fbb1851d56fa9b3369cc01a Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 25 Sep 2024 03:04:38 -0400 Subject: [PATCH 12/23] Support nested batches * Parent batches will not complete until all child batches have been completed --- app/models/solid_queue/job_batch.rb | 44 +++++++++++++++++++---------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 61780f38..eb41f7de 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -5,6 +5,7 @@ class JobBatch < Record belongs_to :job, foreign_key: :job_id, optional: true belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true has_many :jobs, foreign_key: :batch_id + has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch" serialize :on_finish_active_job, coder: JSON serialize :on_success_active_job, coder: JSON @@ -21,28 +22,33 @@ def current_batch_id end def enqueue(attributes = {}) - previous_batch_id = current_batch_id.presence || nil - job_batch = nil transaction do job_batch = create!(batch_attributes(attributes)) - ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id - yield job_batch + wrap_in_batch_context(job_batch.id) do + yield job_batch + end end job_batch - ensure - ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id end def dispatch_finished_batches incomplete.order(:id).pluck(:id).each do |id| transaction do - where(id: id).non_blocking_lock.each(&:finish) + where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish) end end end + def wrap_in_batch_context(batch_id) + previous_batch_id = current_batch_id.presence || nil + ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id + yield + ensure + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id + end + private def batch_attributes(attributes) @@ -62,6 +68,8 @@ def batch_attributes(attributes) attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize end + attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present? + attributes end @@ -74,16 +82,13 @@ def as_active_job(active_job_klass) def enqueue(attributes = {}) raise "You cannot enqueue a batch that is already finished" if finished? - previous_batch_id = self.class.current_batch_id.presence || nil - transaction do - ActiveSupport::IsolatedExecutionState[:current_batch_id] = id - yield self + self.class.wrap_in_batch_context(id) do + yield self + end end self - ensure - ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id end def finished? @@ -110,6 +115,10 @@ def finish return unless status.in?([ :finished, :failed ]) end + children.find_each do |child| + return unless child.finished? + end + if on_finish_active_job.present? perform_completion_job(:on_finish_active_job, attrs) end @@ -118,7 +127,10 @@ def finish perform_completion_job(:on_success_active_job, attrs) end - update!({ finished_at: Time.zone.now }.merge(attrs)) + transaction do + parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present? + update!({ finished_at: Time.zone.now }.merge(attrs)) + end end private @@ -133,7 +145,9 @@ def perform_completion_job(job_field, attrs) active_job = ActiveJob::Base.deserialize(send(job_field)) active_job.send(:deserialize_arguments_if_needed) active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([ active_job ]) + self.class.wrap_in_batch_context(id) do + ActiveJob.perform_all_later([ active_job ]) + end active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id attrs[job_field] = active_job.serialize end From b65b9465f7ec0437131f66ab4bb0d1bbbd40e30f Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:51:35 -0400 Subject: [PATCH 13/23] Expanded batch readme --- README.md | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 76fd7b9a..22da916d 100644 --- a/README.md +++ b/README.md @@ -588,12 +588,62 @@ class ApplicationMailer < ActionMailer::Base ## Batch jobs +SolidQueue offers support for batching jobs. This allows you to track progress of a set of jobs, +and optionally trigger callbacks based on their status. It supports the following: + +- Relating jobs to a batch, to track their status +- Three available callbacks to fire: + - `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed. + - `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on` + - `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted. +- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue` +- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished. + ```rb -SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do - 5.times.map { |i| SleepyJob.perform_later(i) } +class SleepyJob < ApplicationJob + def perform(seconds_to_sleep) + Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..." + sleep seconds_to_sleep + end +end + +class MultiStepJob < ApplicationJob + def perform + batch.enqueue do + SleepyJob.perform_later(5) + # Because of this nested batch, the top-level batch won't finish until the inner, + # 10 second job finishes + # Both jobs will still run simultaneously + SolidQueue::JobBatch.enqueue do + SleepyJob.perform_later(10) + end + end + end +end + +class BatchFinishJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "Good job finishing all jobs" + end +end + +class BatchSuccessJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "Good job finishing all jobs, and all of them worked!" + end +end + +class BatchFailureJob < ApplicationJob + def perform(batch) # batch is always the default first argument + Rails.logger.info "At least one job failed, sorry!" + end end -SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) do +SolidQueue::JobBatch.enqueue( + on_finish: BatchFinishJob, + on_success: BatchSuccessJob, + on_failure: BatchFailureJob +) do 5.times.map { |i| SleepyJob.perform_later(i) } end ``` From 8d585fa7c2358131e8e2c4cf1cc988075ca44b4f Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 25 Sep 2024 22:05:28 -0400 Subject: [PATCH 14/23] Force an initial batch check --- app/models/solid_queue/job_batch.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index eb41f7de..bd01475d 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -69,6 +69,9 @@ def batch_attributes(attributes) end attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present? + # Set it initially, so we check the batch even if there are no jobs + attributes[:changed_at] = Time.zone.now + attributes[:last_changed_at] = Time.zone.now attributes end From b27c528c929297803e5c0eca2e0cc36d313bae6b Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Wed, 25 Sep 2024 22:27:27 -0400 Subject: [PATCH 15/23] Initial batch lifecycle tests * Attach success jobs to the parent batch, not to the current batch (which has already finished at this point) --- app/models/solid_queue/job_batch.rb | 2 +- test/integration/batch_lifecycle_test.rb | 83 ++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 test/integration/batch_lifecycle_test.rb diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index bd01475d..40e183b5 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -148,7 +148,7 @@ def perform_completion_job(job_field, attrs) active_job = ActiveJob::Base.deserialize(send(job_field)) active_job.send(:deserialize_arguments_if_needed) active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - self.class.wrap_in_batch_context(id) do + self.class.wrap_in_batch_context(parent_job_batch_id || self.class.current_batch_id) do ActiveJob.perform_all_later([ active_job ]) end active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id diff --git a/test/integration/batch_lifecycle_test.rb b/test/integration/batch_lifecycle_test.rb new file mode 100644 index 00000000..22714315 --- /dev/null +++ b/test/integration/batch_lifecycle_test.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "test_helper" + +class BatchLifecycleTest < ActiveSupport::TestCase + setup do + @worker = SolidQueue::Worker.new(queues: "background", threads: 3) + @dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2) + end + + teardown do + @worker.stop + @dispatcher.stop + + JobBuffer.clear + + SolidQueue::Job.destroy_all + SolidQueue::JobBatch.destroy_all + end + + class BatchOnSuccessJob < ApplicationJob + queue_as :background + + def perform(batch, custom_message = "") + JobBuffer.add "#{custom_message}: #{batch.jobs.size} jobs succeeded!" + end + end + + class AddsMoreJobsJob < ApplicationJob + queue_as :background + + def perform + batch.enqueue do + AddToBufferJob.perform_later "added from inside 1" + AddToBufferJob.perform_later "added from inside 2" + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "added from inside 3" + end + end + end + end + + test "nested batches finish from the inside out" do + batch2 = batch3 = batch4 = nil + batch1 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("3")) do + batch2 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("2")) do + batch3 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1")) { } + batch4 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { } + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal [ "1: 0 jobs succeeded!", "1.1: 0 jobs succeeded!", "2: 2 jobs succeeded!", "3: 1 jobs succeeded!" ], JobBuffer.values + assert_equal 4, SolidQueue::JobBatch.finished.count + assert_equal batch1.reload.finished_at > batch2.reload.finished_at, true + assert_equal batch2.finished_at > batch3.reload.finished_at, true + assert_equal batch2.finished_at > batch4.reload.finished_at, true + end + + test "all jobs are run, including jobs enqueued inside of other jobs" do + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "hey" + SolidQueue::JobBatch.enqueue do + AddToBufferJob.perform_later "ho" + AddsMoreJobsJob.perform_later + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal [ "added from inside 1", "added from inside 2", "added from inside 3", "hey", "ho" ], JobBuffer.values.sort + assert_equal 3, SolidQueue::JobBatch.finished.count + end +end From 94992100888944e483d12e3db81da29402746c37 Mon Sep 17 00:00:00 2001 From: JP Camara <48120+jpcamara@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:51:49 -0500 Subject: [PATCH 16/23] Add job batches to queue_schema.rb as well --- test/dummy/db/queue_schema.rb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..64de0e82 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -38,6 +38,22 @@ t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true end + create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "parent_job_batch_id" + t.text "on_finish_active_job" + t.text "on_success_active_job" + t.text "on_failure_active_job" + t.datetime "finished_at" + t.datetime "changed_at" + t.datetime "last_changed_at" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at" + t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at" + t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at" + t.index ["parent_job_batch_id"], name: "index_solid_queue_job_batches_on_parent_job_batch_id" + end + create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name", null: false t.string "class_name", null: false @@ -49,7 +65,9 @@ t.string "concurrency_key" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.bigint "batch_id" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" @@ -135,6 +153,7 @@ add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade From 0bf3e6572d2bd0513c791a3bdb167e7552a63b81 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 01:35:47 -0400 Subject: [PATCH 17/23] Refactor internals and api namespace of batches * Thanks to Mikael Henriksson for his work in https://github.com/rails/solid_queue/pull/590. His work decentralizes management of batch status by moving it to the BatchUpdateJob, and tracking status using counts rather than querying specific job statuses after the fact. This is a much simpler approach to tracking the jobs, and allows us to avoid a constantly polling set of queries in the dispatcher. Also add in arbitrary metadata to allow tracking data from start to end of execution. This also means enqueueing a BatchUpdateJob based on callbacks in two different kinds of Batchable, which are included when a job is updated and finished, or when a FailedExecution is created (since failed jobs never "finish"). * This batch feature already took some inspiration from the GoodJob batch implementation (https://github.com/bensheldon/good_job). But now we also increase that by adopting some of the buffering and abstractions in a similar form as GoodJob. To discourage heavy reliance on the JobBatch model, it has been renamed to BatchRecord, and a separate Batch interface is how you interact with batches, with some delegation to the core model. * A new Buffer class (also modeled after GoodJob) was added specifically for batches. This was primarily added to support enqueue_after_transaction_commit. We now override the ActiveJob #enqueue method so we can keep track of which jobs are attempting to enqueue. When enqueue_after_transaction_commit is on, those jobs do not enqueue until all transactions commit. By tracking them at the high level enqueue and keeping a buffer of jobs, we can ensure that the jobs get tracked even when their creation is deferred until the transaction is committed. The side benefit is that we get to enqueue all the jobs together, probably offering some performance advantage. This buffer also keeps track of child batches for the same reason. * To support triggering a callback/BatchUpdateJob when a job finishes, the update to finished_at needed to become an update! call * As a simplification, on_failure is now only fired after all jobs finish, rather than at the first time a job fails * The adapter logic itself also needed to be updated to support the buffer and enqueue_after_transaction_commit. If a job is coming from a batch enqueue, we ignore it here and allow the batching process to enqueue_all at the end of the enqueue block. If the job is originally from a batch, but is retrying, we make sure the job counts in the batch stay updated. I don't love this addition, since it adds alot of complication to the adapter code, all solely oriented around batches * Batches benefit from keeping jobs until the batch has finished. As such, we ignore the preserve jobs setting, but if it is set to false, we enqueue a cleanup job once the batch has finished and clear out finished jobs Co-authored-by: Mikael Henriksson --- README.md | 12 +- app/jobs/solid_queue/batch_update_job.rb | 25 ++ app/models/solid_queue/batch_record.rb | 162 ++++++++++++ app/models/solid_queue/claimed_execution.rb | 2 - app/models/solid_queue/execution/batchable.rb | 20 ++ app/models/solid_queue/failed_execution.rb | 2 +- app/models/solid_queue/job.rb | 5 +- app/models/solid_queue/job/batchable.rb | 25 ++ app/models/solid_queue/job/executable.rb | 4 +- app/models/solid_queue/job_batch.rb | 166 ------------ app/models/solid_queue/job_batch/buffer.rb | 47 ++++ ...31013203_create_solid_queue_batch_table.rb | 21 -- lib/active_job/batch_record_id.rb | 47 ++++ lib/active_job/job_batch_id.rb | 26 -- .../queue_adapters/solid_queue_adapter.rb | 28 ++- .../install/templates/db/queue_schema.rb | 22 ++ lib/solid_queue.rb | 2 +- lib/solid_queue/batch.rb | 142 +++++++++++ lib/solid_queue/batch/cleanup_job.rb | 17 ++ lib/solid_queue/batch/empty_job.rb | 14 ++ lib/solid_queue/dispatcher.rb | 1 - lib/solid_queue/engine.rb | 2 +- test/dummy/db/queue_schema.rb | 24 +- test/integration/batch_lifecycle_test.rb | 238 ++++++++++++++++-- test/models/solid_queue/batch_record_test.rb | 81 ++++++ test/models/solid_queue/job_batch_test.rb | 66 ----- test/test_helpers/jobs_test_helper.rb | 2 +- 27 files changed, 878 insertions(+), 325 deletions(-) create mode 100644 app/jobs/solid_queue/batch_update_job.rb create mode 100644 app/models/solid_queue/batch_record.rb create mode 100644 app/models/solid_queue/execution/batchable.rb create mode 100644 app/models/solid_queue/job/batchable.rb delete mode 100644 app/models/solid_queue/job_batch.rb create mode 100644 app/models/solid_queue/job_batch/buffer.rb delete mode 100644 db/migrate/20240131013203_create_solid_queue_batch_table.rb create mode 100644 lib/active_job/batch_record_id.rb delete mode 100644 lib/active_job/job_batch_id.rb create mode 100644 lib/solid_queue/batch.rb create mode 100644 lib/solid_queue/batch/cleanup_job.rb create mode 100644 lib/solid_queue/batch/empty_job.rb create mode 100644 test/models/solid_queue/batch_record_test.rb delete mode 100644 test/models/solid_queue/job_batch_test.rb diff --git a/README.md b/README.md index 22da916d..5a2af368 100644 --- a/README.md +++ b/README.md @@ -595,9 +595,10 @@ and optionally trigger callbacks based on their status. It supports the followin - Three available callbacks to fire: - `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed. - `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on` - - `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted. + - `on_failure`: Fired when all jobs have finished, including retries. Will only fire if one or more jobs have failed. - If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue` -- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished. +- Batches can be nested within other batches, creating a hierarchy. Outer batches will not fire callbacks until all nested jobs have finished. +- Attaching arbitrary metadata to a batch ```rb class SleepyJob < ApplicationJob @@ -614,7 +615,7 @@ class MultiStepJob < ApplicationJob # Because of this nested batch, the top-level batch won't finish until the inner, # 10 second job finishes # Both jobs will still run simultaneously - SolidQueue::JobBatch.enqueue do + SolidQueue::Batch.enqueue do SleepyJob.perform_later(10) end end @@ -639,10 +640,11 @@ class BatchFailureJob < ApplicationJob end end -SolidQueue::JobBatch.enqueue( +SolidQueue::Batch.enqueue( on_finish: BatchFinishJob, on_success: BatchSuccessJob, - on_failure: BatchFailureJob + on_failure: BatchFailureJob, + metadata: { user_id: 123 } ) do 5.times.map { |i| SleepyJob.perform_later(i) } end diff --git a/app/jobs/solid_queue/batch_update_job.rb b/app/jobs/solid_queue/batch_update_job.rb new file mode 100644 index 00000000..a8c41bb4 --- /dev/null +++ b/app/jobs/solid_queue/batch_update_job.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module SolidQueue + class BatchUpdateJob < ActiveJob::Base + class UpdateFailure < RuntimeError; end + + queue_as :background + + discard_on ActiveRecord::RecordNotFound + + def perform(batch_id, job) + batch = SolidQueue::BatchRecord.find_by!(batch_id: batch_id) + + return if job.batch_id != batch_id + + status = job.status + return unless status.in?([ :finished, :failed ]) + + batch.job_finished!(job) + rescue => e + Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job.id}: #{e.message}" + raise + end + end +end diff --git a/app/models/solid_queue/batch_record.rb b/app/models/solid_queue/batch_record.rb new file mode 100644 index 00000000..f95bd18a --- /dev/null +++ b/app/models/solid_queue/batch_record.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +module SolidQueue + class BatchRecord < Record + self.table_name = "solid_queue_job_batches" + + STATUSES = %w[pending processing completed failed] + + belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::BatchRecord", optional: true + has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id + has_many :children, foreign_key: :parent_job_batch_id, primary_key: :batch_id, class_name: "SolidQueue::BatchRecord" + + serialize :on_finish, coder: JSON + serialize :on_success, coder: JSON + serialize :on_failure, coder: JSON + serialize :metadata, coder: JSON + + validates :status, inclusion: { in: STATUSES } + + scope :pending, -> { where(status: "pending") } + scope :processing, -> { where(status: "processing") } + scope :completed, -> { where(status: "completed") } + scope :failed, -> { where(status: "failed") } + scope :finished, -> { where(status: %w[completed failed]) } + scope :unfinished, -> { where(status: %w[pending processing]) } + + after_initialize :set_batch_id + before_create :set_parent_job_batch_id + + def on_success=(value) + super(serialize_callback(value)) + end + + def on_failure=(value) + super(serialize_callback(value)) + end + + def on_finish=(value) + super(serialize_callback(value)) + end + + def job_finished!(job) + return if finished? + + transaction do + if job.failed_execution.present? + self.class.where(id: id).update_all( + "failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1" + ) + else + self.class.where(id: id).update_all( + "completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1" + ) + end + + reload + check_completion! + end + end + + def check_completion! + return if finished? + + actual_children = children.count + return if actual_children < expected_children + + children.find_each do |child| + return unless child.finished? + end + + if pending_jobs <= 0 + if failed_jobs > 0 + mark_as_failed! + else + mark_as_completed! + end + clear_unpreserved_jobs + elsif status == "pending" + update!(status: "processing") + end + end + + def finished? + status.in?(%w[completed failed]) + end + + def processing? + status == "processing" + end + + def pending? + status == "pending" + end + + def progress_percentage + return 0 if total_jobs == 0 + ((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2) + end + + private + + def set_parent_job_batch_id + self.parent_job_batch_id ||= Batch.current_batch_id if Batch.current_batch_id.present? + end + + def set_batch_id + self.batch_id ||= SecureRandom.uuid + end + + def as_active_job(active_job_klass) + active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new + end + + def serialize_callback(value) + return value if value.blank? + as_active_job(value).serialize + end + + def perform_completion_job(job_field, attrs) + active_job = ActiveJob::Base.deserialize(send(job_field)) + active_job.send(:deserialize_arguments_if_needed) + active_job.arguments = [ Batch.new(_batch_record: self) ] + Array.wrap(active_job.arguments) + ActiveJob.perform_all_later([ active_job ]) + + active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id + attrs[job_field] = active_job.serialize + end + + def mark_as_completed! + # SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished, + # and there is no record of the failure. + # GoodJob would report a discard as an error. It's possible we should do that in the future? + update!(status: "completed", finished_at: Time.current) + + perform_completion_job(:on_success, {}) if on_success.present? + perform_completion_job(:on_finish, {}) if on_finish.present? + + if parent_job_batch_id.present? + parent = BatchRecord.find_by(batch_id: parent_job_batch_id) + parent&.reload&.check_completion! + end + end + + def mark_as_failed! + update!(status: "failed", finished_at: Time.current) + perform_completion_job(:on_failure, {}) if on_failure.present? + perform_completion_job(:on_finish, {}) if on_finish.present? + + # Check if parent batch can now complete + if parent_job_batch_id.present? + parent = BatchRecord.find_by(batch_id: parent_job_batch_id) + parent&.check_completion! + end + end + + def clear_unpreserved_jobs + SolidQueue::Batch::CleanupJob.perform_later(self) unless SolidQueue.preserve_finished_jobs? + end + end +end + +require_relative "job_batch/buffer" diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c9726b5f..8840505b 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -69,8 +69,6 @@ def perform failed_with(result.error) raise result.error end - - job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present? ensure unblock_next_job end diff --git a/app/models/solid_queue/execution/batchable.rb b/app/models/solid_queue/execution/batchable.rb new file mode 100644 index 00000000..bc1cd7a2 --- /dev/null +++ b/app/models/solid_queue/execution/batchable.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module SolidQueue + class Execution + module Batchable + extend ActiveSupport::Concern + + included do + after_create :update_batch_progress, if: -> { job.batch_id? } + end + + private + def update_batch_progress + BatchUpdateJob.perform_later(job.batch_id, job) + rescue => e + Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}" + end + end + end +end diff --git a/app/models/solid_queue/failed_execution.rb b/app/models/solid_queue/failed_execution.rb index 0b7fffe0..27a2a963 100644 --- a/app/models/solid_queue/failed_execution.rb +++ b/app/models/solid_queue/failed_execution.rb @@ -2,7 +2,7 @@ module SolidQueue class FailedExecution < Execution - include Dispatching + include Dispatching, Batchable serialize :error, coder: JSON diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8a5927ec..e90d8e22 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -4,12 +4,10 @@ module SolidQueue class Job < Record class EnqueueError < StandardError; end - include Executable, Clearable, Recurrable + include Executable, Clearable, Recurrable, Batchable serialize :arguments, coder: JSON - belongs_to :job_batch, foreign_key: :batch_id, optional: true - class << self def enqueue_all(active_jobs) active_jobs_by_job_id = active_jobs.index_by(&:job_id) @@ -56,7 +54,6 @@ def create_all_from_active_jobs(active_jobs) end def attributes_from_active_job(active_job) - active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id { queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME, active_job_id: active_job.job_id, diff --git a/app/models/solid_queue/job/batchable.rb b/app/models/solid_queue/job/batchable.rb new file mode 100644 index 00000000..7d33bcee --- /dev/null +++ b/app/models/solid_queue/job/batchable.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module SolidQueue + class Job + module Batchable + extend ActiveSupport::Concern + + included do + belongs_to :job_batch, foreign_key: :batch_id, optional: true + + after_update :update_batch_progress, if: :batch_id? + end + + private + def update_batch_progress + return unless saved_change_to_finished_at? && finished_at.present? + return unless batch_id.present? + + BatchUpdateJob.perform_later(batch_id, self) + rescue => e + Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}" + end + end + end +end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 08f07bb0..31d21a00 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -76,8 +76,8 @@ def dispatch_bypassing_concurrency_limits end def finished! - if SolidQueue.preserve_finished_jobs? || batch_id.present? - touch(:finished_at) + if SolidQueue.preserve_finished_jobs? || batch_id.present? # We clear jobs after the batch finishes + update!(finished_at: Time.current) else destroy! end diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb deleted file mode 100644 index 40e183b5..00000000 --- a/app/models/solid_queue/job_batch.rb +++ /dev/null @@ -1,166 +0,0 @@ -# frozen_string_literal: true - -module SolidQueue - class JobBatch < Record - belongs_to :job, foreign_key: :job_id, optional: true - belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true - has_many :jobs, foreign_key: :batch_id - has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch" - - serialize :on_finish_active_job, coder: JSON - serialize :on_success_active_job, coder: JSON - serialize :on_failure_active_job, coder: JSON - - scope :incomplete, -> { - where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) - } - scope :finished, -> { where.not(finished_at: nil) } - - class << self - def current_batch_id - ActiveSupport::IsolatedExecutionState[:current_batch_id] - end - - def enqueue(attributes = {}) - job_batch = nil - transaction do - job_batch = create!(batch_attributes(attributes)) - wrap_in_batch_context(job_batch.id) do - yield job_batch - end - end - - job_batch - end - - def dispatch_finished_batches - incomplete.order(:id).pluck(:id).each do |id| - transaction do - where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish) - end - end - end - - def wrap_in_batch_context(batch_id) - previous_batch_id = current_batch_id.presence || nil - ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id - yield - ensure - ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id - end - - private - - def batch_attributes(attributes) - on_finish_klass = attributes.delete(:on_finish) - on_success_klass = attributes.delete(:on_success) - on_failure_klass = attributes.delete(:on_failure) - - if on_finish_klass.present? - attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize - end - - if on_success_klass.present? - attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize - end - - if on_failure_klass.present? - attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize - end - - attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present? - # Set it initially, so we check the batch even if there are no jobs - attributes[:changed_at] = Time.zone.now - attributes[:last_changed_at] = Time.zone.now - - attributes - end - - def as_active_job(active_job_klass) - active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new - end - end - - # Instance-level enqueue - def enqueue(attributes = {}) - raise "You cannot enqueue a batch that is already finished" if finished? - - transaction do - self.class.wrap_in_batch_context(id) do - yield self - end - end - - self - end - - def finished? - finished_at.present? - end - - def finish - return if finished? - reset_changed_at - - all_jobs_succeeded = true - attrs = {} - jobs.find_each do |next_job| - # SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished, - # and there is no record of the failure. - # GoodJob would report a discard as an error. It's possible we should do that in the future? - if fire_failure_job?(next_job) - perform_completion_job(:on_failure_active_job, attrs) - update!(attrs) - end - - status = next_job.status - all_jobs_succeeded = all_jobs_succeeded && status != :failed - return unless status.in?([ :finished, :failed ]) - end - - children.find_each do |child| - return unless child.finished? - end - - if on_finish_active_job.present? - perform_completion_job(:on_finish_active_job, attrs) - end - - if on_success_active_job.present? && all_jobs_succeeded - perform_completion_job(:on_success_active_job, attrs) - end - - transaction do - parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present? - update!({ finished_at: Time.zone.now }.merge(attrs)) - end - end - - private - - def fire_failure_job?(job) - return false if on_failure_active_job.blank? || job.failed_execution.blank? - job = ActiveJob::Base.deserialize(on_failure_active_job) - job.provider_job_id.blank? - end - - def perform_completion_job(job_field, attrs) - active_job = ActiveJob::Base.deserialize(send(job_field)) - active_job.send(:deserialize_arguments_if_needed) - active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - self.class.wrap_in_batch_context(parent_job_batch_id || self.class.current_batch_id) do - ActiveJob.perform_all_later([ active_job ]) - end - active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id - attrs[job_field] = active_job.serialize - end - - def reset_changed_at - if changed_at.blank? && last_changed_at.present? - update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again - else - update_columns(changed_at: nil) # clear out changed_at so we ignore this until the next job finishes - end - end - end -end diff --git a/app/models/solid_queue/job_batch/buffer.rb b/app/models/solid_queue/job_batch/buffer.rb new file mode 100644 index 00000000..982593be --- /dev/null +++ b/app/models/solid_queue/job_batch/buffer.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +module SolidQueue + class BatchRecord + class Buffer + attr_reader :jobs, :child_batches + + def initialize + @jobs = {} + @child_batches = [] + end + + def add(job) + @jobs[job.job_id] = job + job + end + + def add_child_batch(batch) + @child_batches << batch + batch + end + + def capture + previous_buffer = ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] + ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] = self + + yield + + @jobs + ensure + ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] = previous_buffer + end + + def self.current + ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] + end + + def self.capture_job(job) + current&.add(job) + end + + def self.capture_child_batch(batch) + current&.add_child_batch(batch) + end + end + end +end diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb deleted file mode 100644 index 91b76ee8..00000000 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ /dev/null @@ -1,21 +0,0 @@ -class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] - def change - create_table :solid_queue_job_batches do |t| - t.references :parent_job_batch, index: true # FIXME: foreign key - t.text :on_finish_active_job - t.text :on_success_active_job - t.text :on_failure_active_job - t.datetime :finished_at - t.datetime :changed_at - t.datetime :last_changed_at - t.timestamps - - t.index [ :finished_at ] - t.index [ :changed_at ] - t.index [ :last_changed_at ] - end - - add_reference :solid_queue_jobs, :batch, index: true - add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade - end -end diff --git a/lib/active_job/batch_record_id.rb b/lib/active_job/batch_record_id.rb new file mode 100644 index 00000000..7f8491af --- /dev/null +++ b/lib/active_job/batch_record_id.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +# Inspired by active_job/core.rb docs +# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136 +module ActiveJob + module BatchRecordId + extend ActiveSupport::Concern + + included do + attr_accessor :batch_id + end + + def initialize(*arguments, **kwargs) + super + self.batch_id = SolidQueue::Batch.current_batch_id if solid_queue_job? + end + + def enqueue(options = {}) + super.tap do |job| + if solid_queue_job? + SolidQueue::BatchRecord::Buffer.capture_job(self) + end + end + end + + def serialize + super.merge("batch_id" => batch_id) + end + + def deserialize(job_data) + super + self.batch_id = job_data["batch_id"] + end + + def batch + @batch ||= SolidQueue::Batch.new( + _batch_record: SolidQueue::BatchRecord.find_by(batch_id: batch_id) + ) + end + + private + + def solid_queue_job? + self.class.queue_adapter_name == "solid_queue" + end + end +end diff --git a/lib/active_job/job_batch_id.rb b/lib/active_job/job_batch_id.rb deleted file mode 100644 index 494e197f..00000000 --- a/lib/active_job/job_batch_id.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -# Inspired by active_job/core.rb docs -# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136 -module ActiveJob - module JobBatchId - extend ActiveSupport::Concern - - included do - attr_accessor :batch_id - end - - def serialize - super.merge("batch_id" => batch_id) - end - - def deserialize(job_data) - super - self.batch_id = job_data["batch_id"] - end - - def batch - @batch ||= SolidQueue::JobBatch.find_by(id: batch_id) - end - end -end diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index fe556042..04de658d 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -16,16 +16,40 @@ def enqueue_after_transaction_commit? end def enqueue(active_job) # :nodoc: - SolidQueue::Job.enqueue(active_job) + return if in_batch?(active_job) + + SolidQueue::Job.enqueue(active_job).tap do |enqueued_job| + increment_job_count(active_job, enqueued_job) + end end def enqueue_at(active_job, timestamp) # :nodoc: - SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) + return if in_batch?(active_job) + + SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)).tap do |enqueued_job| + increment_job_count(active_job, enqueued_job) + end end def enqueue_all(active_jobs) # :nodoc: SolidQueue::Job.enqueue_all(active_jobs) end + + private + + def in_batch?(active_job) + active_job.batch_id.present? && active_job.executions <= 0 + end + + def in_batch_retry?(active_job) + active_job.batch_id.present? && active_job.executions > 0 + end + + def increment_job_count(active_job, enqueued_job) + if enqueued_job.persisted? && in_batch_retry?(active_job) + SolidQueue::Batch.update_job_count(active_job.batch_id, 1) + end + end end end end diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 85194b6a..07bad1dc 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -26,6 +26,26 @@ t.index [ "job_id" ], name: "index_solid_queue_failed_executions_on_job_id", unique: true end + create_table "solid_queue_job_batches", force: :cascade do |t| + t.string "batch_id", null: false + t.string "parent_job_batch_id" + t.text "on_finish" + t.text "on_success" + t.text "on_failure" + t.text "metadata" + t.integer "total_jobs", default: 0, null: false + t.integer "pending_jobs", default: 0, null: false + t.integer "completed_jobs", default: 0, null: false + t.integer "failed_jobs", default: 0, null: false + t.integer "expected_children", default: 0, null: false + t.string "status", default: "pending", null: false + t.datetime "finished_at" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index [ "finished_at" ], name: "index_solid_queue_job_batches_on_finished_at" + t.index [ "parent_job_batch_id" ], name: "index_solid_queue_job_batches_on_parent_job_batch_id" + end + create_table "solid_queue_jobs", force: :cascade do |t| t.string "queue_name", null: false t.string "class_name", null: false @@ -37,7 +57,9 @@ t.string "concurrency_key" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.string "batch_id" t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id" + t.index [ "batch_id" ], name: "index_solid_queue_jobs_on_batch_id" t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name" t.index [ "finished_at" ], name: "index_solid_queue_jobs_on_finished_at" t.index [ "queue_name", "finished_at" ], name: "index_solid_queue_jobs_for_filtering" diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1277ea67..f4d3a92c 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -5,7 +5,7 @@ require "active_job" require "active_job/queue_adapters" -require "active_job/job_batch_id" +require "active_job/batch_record_id" require "active_support" require "active_support/core_ext/numeric/time" diff --git a/lib/solid_queue/batch.rb b/lib/solid_queue/batch.rb new file mode 100644 index 00000000..7cb6b02b --- /dev/null +++ b/lib/solid_queue/batch.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +require_relative "batch/empty_job" +require_relative "batch/cleanup_job" + +module SolidQueue + class Batch + include GlobalID::Identification + + delegate :completed_jobs, :failed_jobs, :pending_jobs, :total_jobs, :progress_percentage, + :finished?, :processing?, :pending?, :batch_id, + :metadata, :metadata=, + :on_success, :on_success=, + :on_failure, :on_failure=, + :on_finish, :on_finish=, + :reload, + to: :batch_record + + def initialize(_batch_record: nil) + @batch_record = _batch_record || BatchRecord.new + end + + def batch_record + @batch_record + end + + def id + batch_id + end + + def enqueue(&block) + raise "You cannot enqueue a batch that is already finished" if finished? + + SolidQueue::BatchRecord::Buffer.capture_child_batch(self) if batch_record.new_record? + + buffer = SolidQueue::BatchRecord::Buffer.new + buffer.capture do + Batch.wrap_in_batch_context(batch_id) do + block.call(self) + end + end + + if enqueue_after_transaction_commit? + ActiveRecord.after_all_transactions_commit do + enqueue_batch(buffer) + end + else + enqueue_batch(buffer) + end + end + + private + + def enqueue_after_transaction_commit? + return false unless defined?(ApplicationJob.enqueue_after_transaction_commit) + + case ApplicationJob.enqueue_after_transaction_commit + when :always, true + true + when :never, false + false + when :default + true + end + end + + def enqueue_batch(buffer) + if batch_record.new_record? + enqueue_new_batch(buffer) + else + jobs = buffer.jobs.values + enqueue_existing_batch(jobs) + end + end + + def enqueue_new_batch(buffer) + SolidQueue::BatchRecord.transaction do + batch_record.save! + + # If batch has no jobs, enqueue an EmptyJob + # This ensures callbacks always execute, even for empty batches + jobs = buffer.jobs.values + if jobs.empty? + empty_job = SolidQueue::Batch::EmptyJob.new + empty_job.batch_id = batch_record.batch_id + jobs = [ empty_job ] + end + + batch_record.update!( + total_jobs: jobs.size, + pending_jobs: SolidQueue::Job.enqueue_all(jobs), + expected_children: buffer.child_batches.size + ) + end + end + + def enqueue_existing_batch(active_jobs) + jobs = Array.wrap(active_jobs) + enqueued_count = SolidQueue::Job.enqueue_all(jobs) + + Batch.update_job_count(batch_id, enqueued_count) + end + + class << self + def enqueue(on_success: nil, on_failure: nil, on_finish: nil, metadata: nil, &block) + new.tap do |batch| + batch.batch_record.assign_attributes( + on_success: on_success, + on_failure: on_failure, + on_finish: on_finish, + metadata: metadata, + parent_job_batch_id: current_batch_id + ) + + batch.enqueue(&block) + end + end + + def find(batch_id) + new(_batch_record: BatchRecord.find_by!(batch_id: batch_id)) + end + + def update_job_count(batch_id, count) + BatchRecord.where(batch_id: batch_id).update_all( + "total_jobs = total_jobs + #{count}, pending_jobs = pending_jobs + #{count}" + ) + end + + def current_batch_id + ActiveSupport::IsolatedExecutionState[:current_batch_id] + end + + def wrap_in_batch_context(batch_id) + previous_batch_id = current_batch_id.presence || nil + ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id + yield + ensure + ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id + end + end + end +end diff --git a/lib/solid_queue/batch/cleanup_job.rb b/lib/solid_queue/batch/cleanup_job.rb new file mode 100644 index 00000000..eb381908 --- /dev/null +++ b/lib/solid_queue/batch/cleanup_job.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module SolidQueue + class Batch + class CleanupJob < ApplicationJob + queue_as :background + + discard_on ActiveRecord::RecordNotFound + + def perform(job_batch) + return if SolidQueue.preserve_finished_jobs? + + job_batch.jobs.finished.destroy_all + end + end + end +end diff --git a/lib/solid_queue/batch/empty_job.rb b/lib/solid_queue/batch/empty_job.rb new file mode 100644 index 00000000..f457eabe --- /dev/null +++ b/lib/solid_queue/batch/empty_job.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module SolidQueue + class Batch + class EmptyJob < ApplicationJob + queue_as :background + + def perform + # This job does nothing - it just exists to trigger batch completion + # The batch completion will be handled by the normal job_finished! flow + end + end + end +end diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 5bcbe0e8..1583e1dd 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -37,7 +37,6 @@ def poll def dispatch_next_batch with_polling_volume do ScheduledExecution.dispatch_next_batch(batch_size) - SolidQueue::JobBatch.dispatch_finished_batches end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 452ae445..f7e059cb 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -35,7 +35,7 @@ class Engine < ::Rails::Engine initializer "solid_queue.active_job.extensions" do ActiveSupport.on_load :active_job do include ActiveJob::ConcurrencyControls - include ActiveJob::JobBatchId + include ActiveJob::BatchRecordId end end end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 64de0e82..8f386184 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -39,18 +39,23 @@ end create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "parent_job_batch_id" - t.text "on_finish_active_job" - t.text "on_success_active_job" - t.text "on_failure_active_job" + t.string "batch_id" + t.string "parent_job_batch_id" + t.text "on_finish" + t.text "on_success" + t.text "on_failure" + t.text "metadata" + t.integer "total_jobs", default: 0, null: false + t.integer "pending_jobs", default: 0, null: false + t.integer "completed_jobs", default: 0, null: false + t.integer "failed_jobs", default: 0, null: false + t.integer "expected_children", default: 0, null: false + t.string "status", default: "pending", null: false t.datetime "finished_at" - t.datetime "changed_at" - t.datetime "last_changed_at" t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at" + t.index ["batch_id"], name: "index_solid_queue_job_batches_on_batch_id", unique: true t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at" - t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at" t.index ["parent_job_batch_id"], name: "index_solid_queue_job_batches_on_parent_job_batch_id" end @@ -65,7 +70,7 @@ t.string "concurrency_key" t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.bigint "batch_id" + t.string "batch_id" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" @@ -153,7 +158,6 @@ add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade diff --git a/test/integration/batch_lifecycle_test.rb b/test/integration/batch_lifecycle_test.rb index 22714315..35641593 100644 --- a/test/integration/batch_lifecycle_test.rb +++ b/test/integration/batch_lifecycle_test.rb @@ -3,26 +3,69 @@ require "test_helper" class BatchLifecycleTest < ActiveSupport::TestCase + FailingJobError = Class.new(RuntimeError) + + def assert_finished_in_order(*batches) + job_batches = batches.map { |batch| SolidQueue::BatchRecord.find_by(batch_id: batch.batch_id) } + + job_batches.each_cons(2) do |batch1, batch2| + assert_equal batch1.reload.finished_at < batch2.reload.finished_at, true + end + end + setup do + @_on_thread_error = SolidQueue.on_thread_error + SolidQueue.on_thread_error = silent_on_thread_error_for([ FailingJobError ], @_on_thread_error) @worker = SolidQueue::Worker.new(queues: "background", threads: 3) @dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2) end teardown do + SolidQueue.on_thread_error = @_on_thread_error @worker.stop @dispatcher.stop JobBuffer.clear SolidQueue::Job.destroy_all - SolidQueue::JobBatch.destroy_all + SolidQueue::BatchRecord.destroy_all + + ApplicationJob.enqueue_after_transaction_commit = false if defined?(ApplicationJob.enqueue_after_transaction_commit) end class BatchOnSuccessJob < ApplicationJob queue_as :background def perform(batch, custom_message = "") - JobBuffer.add "#{custom_message}: #{batch.jobs.size} jobs succeeded!" + JobBuffer.add "#{custom_message}: #{batch.completed_jobs} jobs succeeded!" + end + end + + class BatchOnFailureJob < ApplicationJob + queue_as :background + + def perform(batch, custom_message = "") + JobBuffer.add "#{custom_message}: #{batch.failed_jobs} jobs failed!" + end + end + + class FailingJob < ApplicationJob + queue_as :background + + retry_on FailingJobError, attempts: 3, wait: 0.1.seconds + + def perform + raise FailingJobError, "Failing job" + end + end + + class DiscardingJob < ApplicationJob + queue_as :background + + discard_on FailingJobError + + def perform + raise FailingJobError, "Failing job" end end @@ -33,7 +76,7 @@ def perform batch.enqueue do AddToBufferJob.perform_later "added from inside 1" AddToBufferJob.perform_later "added from inside 2" - SolidQueue::JobBatch.enqueue do + SolidQueue::Batch.enqueue do AddToBufferJob.perform_later "added from inside 3" end end @@ -42,10 +85,10 @@ def perform test "nested batches finish from the inside out" do batch2 = batch3 = batch4 = nil - batch1 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("3")) do - batch2 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("2")) do - batch3 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1")) { } - batch4 = SolidQueue::JobBatch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { } + batch1 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("3")) do + batch2 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("2")) do + batch3 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1")) { } + batch4 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { } end end @@ -55,17 +98,18 @@ def perform wait_for_job_batches_to_finish_for(2.seconds) wait_for_jobs_to_finish_for(2.seconds) - assert_equal [ "1: 0 jobs succeeded!", "1.1: 0 jobs succeeded!", "2: 2 jobs succeeded!", "3: 1 jobs succeeded!" ], JobBuffer.values - assert_equal 4, SolidQueue::JobBatch.finished.count - assert_equal batch1.reload.finished_at > batch2.reload.finished_at, true - assert_equal batch2.finished_at > batch3.reload.finished_at, true - assert_equal batch2.finished_at > batch4.reload.finished_at, true + expected_values = [ "1: 1 jobs succeeded!", "1.1: 1 jobs succeeded!", "2: 1 jobs succeeded!", "3: 1 jobs succeeded!" ] + assert_equal expected_values.sort, JobBuffer.values.sort + assert_equal 4, SolidQueue::BatchRecord.finished.count + assert_finished_in_order(batch4, batch2, batch1) + assert_finished_in_order(batch3, batch2, batch1) end test "all jobs are run, including jobs enqueued inside of other jobs" do - SolidQueue::JobBatch.enqueue do + batch2 = nil + batch1 = SolidQueue::Batch.enqueue do AddToBufferJob.perform_later "hey" - SolidQueue::JobBatch.enqueue do + batch2 = SolidQueue::Batch.enqueue do AddToBufferJob.perform_later "ho" AddsMoreJobsJob.perform_later end @@ -75,9 +119,171 @@ def perform @worker.start wait_for_job_batches_to_finish_for(2.seconds) - wait_for_jobs_to_finish_for(2.seconds) assert_equal [ "added from inside 1", "added from inside 2", "added from inside 3", "hey", "ho" ], JobBuffer.values.sort - assert_equal 3, SolidQueue::JobBatch.finished.count + assert_equal 3, SolidQueue::BatchRecord.finished.count + assert_finished_in_order(batch2, batch1) + end + + test "when self.enqueue_after_transaction_commit = true" do + skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 1 + + ApplicationJob.enqueue_after_transaction_commit = true + batch1 = batch2 = batch3 = nil + JobResult.transaction do + JobResult.create!(queue_name: "default", status: "") + + batch1 = SolidQueue::Batch.enqueue do + AddToBufferJob.perform_later "hey" + JobResult.transaction(requires_new: true) do + JobResult.create!(queue_name: "default", status: "") + batch2 = SolidQueue::Batch.enqueue do + AddToBufferJob.perform_later "ho" + batch3 = SolidQueue::Batch.enqueue do + AddToBufferJob.perform_later "let's go" + end + end + end + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal 3, SolidQueue::BatchRecord.finished.count + assert_equal 6, SolidQueue::Job.finished.count + assert_equal 6, SolidQueue::Job.count + assert_finished_in_order(batch3, batch2, batch1) + end + + test "failed jobs fire properly" do + batch2 = nil + batch1 = SolidQueue::Batch.enqueue(on_failure: BatchOnFailureJob.new("0")) do + FailingJob.perform_later + batch2 = SolidQueue::Batch.enqueue(on_failure: BatchOnFailureJob.new("1")) do + FailingJob.perform_later + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(3.seconds) + + job_batch1 = SolidQueue::BatchRecord.find_by(batch_id: batch1.batch_id) + job_batch2 = SolidQueue::BatchRecord.find_by(batch_id: batch2.batch_id) + + assert_equal 2, SolidQueue::BatchRecord.count + assert_equal 2, SolidQueue::BatchRecord.finished.count + + assert_equal 3, job_batch1.total_jobs + assert_equal 1, job_batch1.failed_jobs + assert_equal 2, job_batch1.completed_jobs + assert_equal 0, job_batch1.pending_jobs + + assert_equal 3, job_batch2.total_jobs + assert_equal 1, job_batch2.failed_jobs + assert_equal 2, job_batch2.completed_jobs + assert_equal 0, job_batch2.pending_jobs + + assert_equal [ "failed", "failed" ].sort, SolidQueue::BatchRecord.all.pluck(:status) + assert_equal [ "0: 1 jobs failed!", "1: 1 jobs failed!" ], JobBuffer.values.sort + assert_finished_in_order(batch2, batch1) + end + + test "discarded jobs fire properly" do + batch2 = nil + batch1 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("0")) do + DiscardingJob.perform_later + batch2 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1")) do + DiscardingJob.perform_later + end + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(3.seconds) + + job_batch1 = SolidQueue::BatchRecord.find_by(batch_id: batch1.batch_id) + job_batch2 = SolidQueue::BatchRecord.find_by(batch_id: batch2.batch_id) + + assert_equal 2, SolidQueue::BatchRecord.count + assert_equal 2, SolidQueue::BatchRecord.finished.count + + assert_equal 1, job_batch1.total_jobs + assert_equal 0, job_batch1.failed_jobs + assert_equal 1, job_batch1.completed_jobs + assert_equal 0, job_batch1.pending_jobs + + assert_equal 1, job_batch2.total_jobs + assert_equal 0, job_batch2.failed_jobs + assert_equal 1, job_batch2.completed_jobs + assert_equal 0, job_batch2.pending_jobs + + assert_equal [ "completed", "completed" ].sort, SolidQueue::BatchRecord.all.pluck(:status) + assert_equal [ "0: 1 jobs succeeded!", "1: 1 jobs succeeded!" ], JobBuffer.values.sort + assert_finished_in_order(batch2, batch1) + end + + # TODO: spec for each callback type firing properly + + test "preserve_finished_jobs = false" do + # SolidQueue.preserve_finished_jobs = false + # batch1 = SolidQueue::Batch.enqueue do + # AddToBufferJob.perform_later "hey" + # end + end + + test "batch interface" do + batch = SolidQueue::Batch.enqueue( + metadata: { source: "test", priority: "high", user_id: 123 }, + on_finish: OnFinishJob, + on_success: OnSuccessJob, + on_failure: OnFailureJob + ) do + AddToBufferJob.perform_later "hey" + end + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal [ "Hi finish #{batch.batch_id}!", "Hi success #{batch.batch_id}!", "hey" ].sort, JobBuffer.values.sort + assert_equal 1, batch.reload.completed_jobs + assert_equal 0, batch.failed_jobs + assert_equal 0, batch.pending_jobs + assert_equal 1, batch.total_jobs + end + + class OnFinishJob < ApplicationJob + queue_as :background + + def perform(batch) + JobBuffer.add "Hi finish #{batch.batch_id}!" + end + end + + class OnSuccessJob < ApplicationJob + queue_as :background + + def perform(batch) + JobBuffer.add "Hi success #{batch.batch_id}!" + end + end + + class OnFailureJob < ApplicationJob + queue_as :background + + def perform(batch) + JobBuffer.add "Hi failure #{batch.batch_id}!" + end end end diff --git a/test/models/solid_queue/batch_record_test.rb b/test/models/solid_queue/batch_record_test.rb new file mode 100644 index 00000000..d0ffbf6f --- /dev/null +++ b/test/models/solid_queue/batch_record_test.rb @@ -0,0 +1,81 @@ +require "test_helper" + +class SolidQueue::BatchRecordTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + teardown do + SolidQueue::Job.destroy_all + SolidQueue::BatchRecord.destroy_all + end + + class BatchWithArgumentsJob < ApplicationJob + def perform(batch, arg1, arg2) + Rails.logger.info "Hi #{batch.batch_id}, #{arg1}, #{arg2}!" + end + end + + class NiceJob < ApplicationJob + retry_on Exception, wait: 1.second + + def perform(arg) + Rails.logger.info "Hi #{arg}!" + end + end + + test "batch will be completed on success" do + batch = SolidQueue::Batch.enqueue(on_finish: BatchCompletionJob) { } + job_batch = SolidQueue::BatchRecord.find_by(batch_id: batch.batch_id) + assert_not_nil job_batch.on_finish + assert_equal BatchCompletionJob.name, job_batch.on_finish["job_class"] + end + + test "batch will be completed on finish" do + batch = SolidQueue::Batch.enqueue(on_success: BatchCompletionJob) { } + job_batch = SolidQueue::BatchRecord.find_by(batch_id: batch.batch_id) + assert_not_nil job_batch.on_success + assert_equal BatchCompletionJob.name, job_batch.on_success["job_class"] + end + + test "sets the batch_id on jobs created inside of the enqueue block" do + batch = SolidQueue::Batch.enqueue(on_finish: BatchCompletionJob) do + NiceJob.perform_later("world") + NiceJob.perform_later("people") + end + + assert_equal 2, SolidQueue::Job.count + assert_equal [ batch.batch_id ] * 2, SolidQueue::Job.last(2).map(&:batch_id) + end + + test "batch id is present inside the block" do + assert_nil SolidQueue::Batch.current_batch_id + SolidQueue::Batch.enqueue(on_finish: BatchCompletionJob) do + assert_not_nil SolidQueue::Batch.current_batch_id + end + assert_nil SolidQueue::Batch.current_batch_id + end + + test "allow arguments and options for callbacks" do + SolidQueue::Batch.enqueue( + on_finish: BatchWithArgumentsJob.new(1, 2).set(queue: :batch), + ) do + NiceJob.perform_later("world") + end + + assert_not_nil SolidQueue::BatchRecord.last.on_finish["arguments"] + assert_equal SolidQueue::BatchRecord.last.on_finish["arguments"], [ 1, 2 ] + assert_equal SolidQueue::BatchRecord.last.on_finish["queue_name"], "batch" + end + + test "creates batch with metadata" do + SolidQueue::Batch.enqueue( + metadata: { source: "test", priority: "high", user_id: 123 } + ) do + NiceJob.perform_later("world") + end + + assert_not_nil SolidQueue::BatchRecord.last.metadata + assert_equal SolidQueue::BatchRecord.last.metadata["source"], "test" + assert_equal SolidQueue::BatchRecord.last.metadata["priority"], "high" + assert_equal SolidQueue::BatchRecord.last.metadata["user_id"], 123 + end +end diff --git a/test/models/solid_queue/job_batch_test.rb b/test/models/solid_queue/job_batch_test.rb deleted file mode 100644 index e49f59c2..00000000 --- a/test/models/solid_queue/job_batch_test.rb +++ /dev/null @@ -1,66 +0,0 @@ -require "test_helper" - -class SolidQueue::JobBatchTest < ActiveSupport::TestCase - self.use_transactional_tests = false - - teardown do - SolidQueue::Job.destroy_all - SolidQueue::JobBatch.destroy_all - end - - class BatchWithArgumentsJob < ApplicationJob - def perform(batch, arg1, arg2) - Rails.logger.info "Hi #{batch.id}, #{arg1}, #{arg2}!" - end - end - - class NiceJob < ApplicationJob - retry_on Exception, wait: 1.second - - def perform(arg) - Rails.logger.info "Hi #{arg}!" - end - end - - test "batch will be completed on success" do - batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) { } - assert_not_nil batch.on_finish_active_job - assert_equal BatchCompletionJob.name, batch.on_finish_active_job["job_class"] - end - - test "batch will be completed on finish" do - batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) { } - assert_not_nil batch.on_success_active_job - assert_equal BatchCompletionJob.name, batch.on_success_active_job["job_class"] - end - - test "sets the batch_id on jobs created inside of the enqueue block" do - batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do - NiceJob.perform_later("world") - NiceJob.perform_later("people") - end - - assert_equal 2, SolidQueue::Job.count - assert_equal [ batch.id ] * 2, SolidQueue::Job.last(2).map(&:batch_id) - end - - test "batch id is present inside the block" do - assert_nil SolidQueue::JobBatch.current_batch_id - SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do - assert_not_nil SolidQueue::JobBatch.current_batch_id - end - assert_nil SolidQueue::JobBatch.current_batch_id - end - - test "allow arguments and options for callbacks" do - SolidQueue::JobBatch.enqueue( - on_finish: BatchWithArgumentsJob.new(1, 2).set(queue: :batch), - ) do - NiceJob.perform_later("world") - end - - assert_not_nil SolidQueue::JobBatch.last.on_finish_active_job["arguments"] - assert_equal SolidQueue::JobBatch.last.on_finish_active_job["arguments"], [ 1, 2 ] - assert_equal SolidQueue::JobBatch.last.on_finish_active_job["queue_name"], "batch" - end -end diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index b000a65d..1a8a205c 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -20,7 +20,7 @@ def wait_for_jobs_to_be_released_for(timeout = 1.second) def wait_for_job_batches_to_finish_for(timeout = 1.second) wait_while_with_timeout(timeout) do skip_active_record_query_cache do - SolidQueue::JobBatch.where(finished_at: nil).any? + SolidQueue::BatchRecord.where(finished_at: nil).any? end end end From d3315127b504b54e6a434e6e36100cddf5e96893 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 01:42:43 -0400 Subject: [PATCH 18/23] Buffer was under the wrong namespace --- app/models/solid_queue/{job_batch => batch_record}/buffer.rb | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename app/models/solid_queue/{job_batch => batch_record}/buffer.rb (100%) diff --git a/app/models/solid_queue/job_batch/buffer.rb b/app/models/solid_queue/batch_record/buffer.rb similarity index 100% rename from app/models/solid_queue/job_batch/buffer.rb rename to app/models/solid_queue/batch_record/buffer.rb From 373c8704d43bf12e45563f89bdf27860d3470845 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 01:44:34 -0400 Subject: [PATCH 19/23] Wrong require --- app/models/solid_queue/batch_record.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/solid_queue/batch_record.rb b/app/models/solid_queue/batch_record.rb index f95bd18a..8f50b664 100644 --- a/app/models/solid_queue/batch_record.rb +++ b/app/models/solid_queue/batch_record.rb @@ -159,4 +159,4 @@ def clear_unpreserved_jobs end end -require_relative "job_batch/buffer" +require_relative "batch_record/buffer" From e059d229ad9cdd3349ca6b02a5dafd0454bbbaa4 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 02:00:29 -0400 Subject: [PATCH 20/23] Implement preserved jobs test and remove todo --- test/integration/batch_lifecycle_test.rb | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/test/integration/batch_lifecycle_test.rb b/test/integration/batch_lifecycle_test.rb index 35641593..3ad455f8 100644 --- a/test/integration/batch_lifecycle_test.rb +++ b/test/integration/batch_lifecycle_test.rb @@ -31,6 +31,7 @@ def assert_finished_in_order(*batches) SolidQueue::BatchRecord.destroy_all ApplicationJob.enqueue_after_transaction_commit = false if defined?(ApplicationJob.enqueue_after_transaction_commit) + SolidQueue.preserve_finished_jobs = true end class BatchOnSuccessJob < ApplicationJob @@ -231,13 +232,24 @@ def perform assert_finished_in_order(batch2, batch1) end - # TODO: spec for each callback type firing properly - test "preserve_finished_jobs = false" do - # SolidQueue.preserve_finished_jobs = false - # batch1 = SolidQueue::Batch.enqueue do - # AddToBufferJob.perform_later "hey" - # end + SolidQueue.preserve_finished_jobs = false + batch1 = SolidQueue::Batch.enqueue do + AddToBufferJob.perform_later "hey" + end + + assert_equal false, batch1.reload.finished? + assert_equal 1, SolidQueue::Job.count + assert_equal 0, SolidQueue::Job.finished.count + + @dispatcher.start + @worker.start + + wait_for_job_batches_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal true, batch1.reload.finished? + assert_equal 0, SolidQueue::Job.count end test "batch interface" do From 5343bfd49e46e2f3bf0a7826454ecbfd4edbae36 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 18:04:04 -0400 Subject: [PATCH 21/23] Idempotent updates with pessismistic locks --- app/models/solid_queue/batch_record.rb | 49 +++++++++++-------- .../install/templates/db/queue_schema.rb | 1 + lib/solid_queue/batch.rb | 2 +- test/dummy/db/queue_schema.rb | 1 + 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/app/models/solid_queue/batch_record.rb b/app/models/solid_queue/batch_record.rb index 8f50b664..3d6ed0d2 100644 --- a/app/models/solid_queue/batch_record.rb +++ b/app/models/solid_queue/batch_record.rb @@ -41,21 +41,26 @@ def on_finish=(value) def job_finished!(job) return if finished? - - transaction do - if job.failed_execution.present? - self.class.where(id: id).update_all( - "failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1" - ) - else - self.class.where(id: id).update_all( - "completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1" - ) + return if job.batch_processed_at? + + job.with_lock do + if job.batch_processed_at.blank? + job.update!(batch_processed_at: Time.current) + + if job.failed_execution.present? + self.class.where(id: id).update_all( + "failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1" + ) + else + self.class.where(id: id).update_all( + "completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1" + ) + end end - - reload - check_completion! end + + reload + check_completion! end def check_completion! @@ -68,15 +73,17 @@ def check_completion! return unless child.finished? end - if pending_jobs <= 0 - if failed_jobs > 0 - mark_as_failed! - else - mark_as_completed! + with_lock do + if pending_jobs <= 0 + if failed_jobs > 0 + mark_as_failed! + else + mark_as_completed! + end + clear_unpreserved_jobs + elsif status == "pending" + update!(status: "processing") end - clear_unpreserved_jobs - elsif status == "pending" - update!(status: "processing") end end diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 07bad1dc..f9fd033f 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -58,6 +58,7 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false t.string "batch_id" + t.datetime "batch_processed_at" t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id" t.index [ "batch_id" ], name: "index_solid_queue_jobs_on_batch_id" t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name" diff --git a/lib/solid_queue/batch.rb b/lib/solid_queue/batch.rb index 7cb6b02b..0904df0a 100644 --- a/lib/solid_queue/batch.rb +++ b/lib/solid_queue/batch.rb @@ -8,7 +8,7 @@ class Batch include GlobalID::Identification delegate :completed_jobs, :failed_jobs, :pending_jobs, :total_jobs, :progress_percentage, - :finished?, :processing?, :pending?, :batch_id, + :finished?, :processing?, :pending?, :status, :batch_id, :metadata, :metadata=, :on_success, :on_success=, :on_failure, :on_failure=, diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 8f386184..1201a59f 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -71,6 +71,7 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false t.string "batch_id" + t.datetime "batch_processed_at" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" From 70149cf272fd25cc304492cec4d3c65f3c48a4a7 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 20:13:52 -0400 Subject: [PATCH 22/23] Check if it finished before we acquired the lock --- app/models/solid_queue/batch_record.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/models/solid_queue/batch_record.rb b/app/models/solid_queue/batch_record.rb index 3d6ed0d2..16e3c2e9 100644 --- a/app/models/solid_queue/batch_record.rb +++ b/app/models/solid_queue/batch_record.rb @@ -74,7 +74,9 @@ def check_completion! end with_lock do - if pending_jobs <= 0 + if finished? + # do nothing + elsif pending_jobs <= 0 if failed_jobs > 0 mark_as_failed! else From 9bbf863cb87f2c857856686f322b3c2b0180db39 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Fri, 29 Aug 2025 20:35:39 -0400 Subject: [PATCH 23/23] Make sure callbacks are not considered part of a batch * Use enqueue_all directly rather than passing through activejob for completion jobs --- app/models/solid_queue/batch_record.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/app/models/solid_queue/batch_record.rb b/app/models/solid_queue/batch_record.rb index 16e3c2e9..32893e86 100644 --- a/app/models/solid_queue/batch_record.rb +++ b/app/models/solid_queue/batch_record.rb @@ -122,14 +122,17 @@ def as_active_job(active_job_klass) def serialize_callback(value) return value if value.blank? - as_active_job(value).serialize + active_job = as_active_job(value) + # We can pick up batch ids from context, but callbacks should never be considered a part of the batch + active_job.batch_id = nil + active_job.serialize end def perform_completion_job(job_field, attrs) active_job = ActiveJob::Base.deserialize(send(job_field)) active_job.send(:deserialize_arguments_if_needed) active_job.arguments = [ Batch.new(_batch_record: self) ] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([ active_job ]) + SolidQueue::Job.enqueue_all([ active_job ]) active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id attrs[job_field] = active_job.serialize