diff --git a/README.md b/README.md index c77ed953..24cb2b42 100644 --- a/README.md +++ b/README.md @@ -618,6 +618,73 @@ my_periodic_resque_job: and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. +## Multisharding + +If your application reaches a point where the pressure on the database used for jobs is such that you need to spread the load over multiple databases, then this section is for you. + +You can extend the Solid Queue database configuration to use different shards: + + ```ruby + config.solid_queue.connects_to = { + shards: { + queue_shard_one: { writing: :queue_shard_one }, + queue_shard_two: { writing: :queue_shard_two } + } + } + ``` + +Queue database shards will need to have been defined in `config/database.yml` as shown in the installation section. Both shards need to share the same schema, and down the line share the same migration configuration: + + ```yaml + production: + primary: + <<: *default + database: storage/production.sqlite3 + queue_shard_one: + <<: *default + database: storage/production_queue_shard_one.sqlite3 + migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: storage/production_queue_shard_two.sqlite3 + migrations_paths: db/queue_migrate + ``` + +Simply converting a simpler database configuration such as `config.solid_queue.connects_to = { database: { writing: :queue } }` to `config.solid_queue.connects_to = { shards: { queue: { writing: :queue } } }` will not have any effects on the behavior of Solid Queue. + +### Configuration + +In `config/environments/production.rb` or for the environment you want to enable Solid Queue in, you can define the following options: + + ```ruby + config.solid_queue.primary_shard = :queue_shard_one # optional + config.solid_queue.active_shard = ENV["SOLID_QUEUE_ACTIVE_SHARD"]&.to_sym + config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { nil } + ``` + +- `config.solid_queue.primary_shard` is the shard that will be used to enqueue or schedule jobs without any specific adapter configuration. It defaults to the first shard found in `config.solid_queue.connects_to` (ActiveRecord default) +- `config.solid_queue.active_shard` is the shard that will be used by workers, dispatchers and schedulers to manage and process jobs. It defaults to the `primary_shard`. + With a basic Solid Queue configuration and the option described above you can start a worker and dispatcher working on the `queue_shard_two` shard by running `SOLID_QUEUE_ACTIVE_SHARD=queue_shard_two bin/jobs start` +- `config.solid_queue.shard_selection_lambda` helps you define a custom strategy to determine in which shard a job should be enqueued. It accepts keyword parameters `active_job` when a single job is enqueued or scheduled and `active_jobs` when jobs are bulk enqueued. If the lambda is defined but returns `nil`, Solid Queue will use the adapter defined for the job. + +### Enqueueing jobs in different shards + +Individual jobs can be assigned to shards by leveraging their `queue_adapter` property: + + ```ruby + class SomeJob < ApplicationJob + self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two) + ``` + +This job will be enqueued in the shard named `queue_shard_two`. + +Alternatively you can define a lambda to implement a custom strategy for defining which shard a job will be enqueued to: + + ```ruby + config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { SolidQueue.connects_to[:shards].keys.sample } # pick a shard randomly + ``` + + ## Inspiration Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot. diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..53e5b47c 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -4,7 +4,16 @@ module SolidQueue class Record < ActiveRecord::Base self.abstract_class = true - connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + def self.connects_to_and_set_active_shard + connects_to(**SolidQueue.connects_to) + + if SolidQueue.connects_to.key?(:shards) && + SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard) + self.default_shard = SolidQueue.active_shard + end + end + + connects_to_and_set_active_shard if SolidQueue.connects_to def self.non_blocking_lock if SolidQueue.use_skip_locked diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index d3042194..9ae293fd 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -8,20 +8,41 @@ module QueueAdapters # # Rails.application.config.active_job.queue_adapter = :solid_queue class SolidQueueAdapter + def initialize(db_shard: nil) + @db_shard = db_shard + end + def enqueue_after_transaction_commit? true end def enqueue(active_job) # :nodoc: - SolidQueue::Job.enqueue(active_job) + select_shard(active_job:) { SolidQueue::Job.enqueue(active_job) } end def enqueue_at(active_job, timestamp) # :nodoc: - SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) + select_shard(active_job:) do + SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) + end end def enqueue_all(active_jobs) # :nodoc: - SolidQueue::Job.enqueue_all(active_jobs) + select_shard(active_jobs:) { SolidQueue::Job.enqueue_all(active_jobs) } + end + + private + + def select_shard(active_job: nil, active_jobs: nil, &block) + shard = + SolidQueue.shard_selection_lambda&.call(active_job:, active_jobs:) || + @db_shard || + SolidQueue.primary_shard + + if shard + ActiveRecord::Base.connected_to(shard: shard) { block.call } + else + block.call + end end end end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1e1961e6..68bfd079 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,6 +41,8 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :primary_shard, :active_shard, :shard_selection_lambda + delegate :on_start, :on_stop, to: Supervisor def on_worker_start(...) diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 99e14150..8fc86061 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -45,5 +45,15 @@ class Engine < ::Rails::Engine SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible end end + + initializer "solid_queue.shard_configuration" do + ActiveSupport.on_load(:solid_queue) do + # Record the name of the primary shard, which should be used for + # adapter less jobs + if SolidQueue.connects_to.key?(:shards) && SolidQueue.primary_shard.nil? + SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first + end + end + end end end diff --git a/test/dummy/app/jobs/shard_two_job.rb b/test/dummy/app/jobs/shard_two_job.rb new file mode 100644 index 00000000..674f1df2 --- /dev/null +++ b/test/dummy/app/jobs/shard_two_job.rb @@ -0,0 +1,8 @@ +class ShardTwoJob < ApplicationJob + self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two) + queue_as :background + + def perform(arg) + JobBuffer.add(arg) + end +end diff --git a/test/dummy/config/database.yml b/test/dummy/config/database.yml index fdb186a5..7302c422 100644 --- a/test/dummy/config/database.yml +++ b/test/dummy/config/database.yml @@ -48,6 +48,10 @@ development: <<: *default database: <%= database_name_from("development_queue") %> migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: <%= database_name_from("development_queue_shard_two") %> + migrations_paths: db/queue_migrate test: primary: @@ -65,3 +69,7 @@ test: <<: *default database: <%= database_name_from("test_queue") %> migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: <%= database_name_from("test_queue_shard_two") %> + migrations_paths: db/queue_migrate diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index a5a99232..57fd3057 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -49,7 +49,12 @@ # Replace the default in-process and non-durable queuing backend for Active Job. config.active_job.queue_adapter = :solid_queue - config.solid_queue.connects_to = { database: { writing: :queue } } + config.solid_queue.connects_to = { + shards: { + queue: { writing: :queue }, + queue_shard_two: { writing: :queue_shard_two } + } + } # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true diff --git a/test/dummy/db/queue_shard_two_schema.rb b/test/dummy/db/queue_shard_two_schema.rb new file mode 100644 index 00000000..697c2e92 --- /dev/null +++ b/test/dummy/db/queue_shard_two_schema.rb @@ -0,0 +1,141 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 1) do + create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index decab5b0..098a9e90 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -31,6 +31,22 @@ class JobsLifecycleTest < ActiveSupport::TestCase assert_equal 2, SolidQueue::Job.finished.count end + test "enqueue and run jobs from different shards" do + AddToBufferJob.perform_later "hey" + ShardTwoJob.perform_later "ho" + + change_active_shard_to(:queue_shard_two) do + @dispatcher.start + @worker.start + + wait_for_jobs_to_finish_for(2.seconds) + end + + assert_equal [ "ho" ], JobBuffer.values.sort + assert_equal 1, SolidQueue::ReadyExecution.count + assert_equal 1, ActiveRecord::Base.connected_to(shard: :queue_shard_two) { SolidQueue::Job.finished.count } + end + test "enqueue and run jobs that fail without retries" do RaisingJob.perform_later(ExpectedTestError, "A") RaisingJob.perform_later(ExpectedTestError, "B") diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..4a12a728 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -28,7 +28,7 @@ class ExpectedTestError < RuntimeError; end class ActiveSupport::TestCase - include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper + include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper, MultishardingTestHelper setup do @_on_thread_error = SolidQueue.on_thread_error diff --git a/test/test_helpers/multisharding_test_helper.rb b/test/test_helpers/multisharding_test_helper.rb new file mode 100644 index 00000000..98ba3f13 --- /dev/null +++ b/test/test_helpers/multisharding_test_helper.rb @@ -0,0 +1,17 @@ +module MultishardingTestHelper + private + + def connected_to_shard_two(&block) + ActiveRecord::Base.connected_to(shard: :queue_shard_two) { block.call } + end + + def change_active_shard_to(new_shard_name, &block) + old_shard_name = SolidQueue.active_shard + SolidQueue.active_shard = new_shard_name + SolidQueue::Record.connects_to_and_set_active_shard + block.call + ensure + SolidQueue.active_shard = old_shard_name + SolidQueue::Record.connects_to_and_set_active_shard + end +end diff --git a/test/unit/multisharding_test.rb b/test/unit/multisharding_test.rb new file mode 100644 index 00000000..5ab086ea --- /dev/null +++ b/test/unit/multisharding_test.rb @@ -0,0 +1,79 @@ +require "test_helper" + +class MultishardingTest < ActiveSupport::TestCase + test "jobs are enqueued in the right shard" do + assert_difference -> { SolidQueue::Job.count }, 1 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 1 do + AddToBufferJob.perform_later "hey!" + ShardTwoJob.perform_later "coucou!" + end + end + end + + test "jobs are enqueued in the right shard no matter the primary shard" do + assert_difference -> { SolidQueue::Job.count }, 1 do + change_active_shard_to(:queue_shard_two) { AddToBufferJob.perform_later "hey!" } + end + end + + test "shard_selection_lambda can override which shard is used to enqueue individual jobs" do + shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_job.arguments.first == "hey!" } + + with_shard_selection_lambda(shard_selection_lambda) do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 1 do + AddToBufferJob.perform_later "hey!" + end + end + end + + test "jobs are enqueued for later in the right shard" do + assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do + assert_difference -> { connected_to_shard_two { SolidQueue::ScheduledExecution.count } }, 1 do + AddToBufferJob.set(wait: 1).perform_later "hey!" + ShardTwoJob.set(wait: 1).perform_later "coucou!" + end + end + end + + test "jobs are enqueued in bulk in the right shard" do + active_jobs = [ + AddToBufferJob.new(2), + ShardTwoJob.new(6), + AddToBufferJob.new(3), + ShardTwoJob.new(7) + ] + + assert_difference -> { SolidQueue::Job.count }, 2 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 2 do + ActiveJob.perform_all_later(active_jobs) + end + end + end + + test "shard_selection_lambda can override which shard is used to enqueue jobs in bulk" do + active_jobs = [ + AddToBufferJob.new(2), + ShardTwoJob.new(6), + AddToBufferJob.new(3), + ShardTwoJob.new(7) + ] + shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_jobs.size == 2 } + + with_shard_selection_lambda(shard_selection_lambda) do + assert_difference -> { SolidQueue::Job.count }, 0 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 4 do + ActiveJob.perform_all_later(active_jobs) + end + end + end + end + + private + + def with_shard_selection_lambda(lambda, &block) + SolidQueue.shard_selection_lambda = lambda + block.call + ensure + SolidQueue.shard_selection_lambda = nil + end +end