Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ActiveJob adapter to delayed_job #1219

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ group :test do
if ENV['RAILS_VERSION'] == 'edge'
gem 'actionmailer', :github => 'rails/rails'
gem 'activerecord', :github => 'rails/rails'
gem 'activejob', :github => 'rails/rails'
elsif ENV['RAILS_VERSION']
gem 'actionmailer', "~> #{ENV['RAILS_VERSION']}"
gem 'activerecord', "~> #{ENV['RAILS_VERSION']}"

if ENV['RAILS_VERSION'] < '5.1'
gem 'loofah', '2.3.1'
gem 'nokogiri', '< 1.11.0'
Expand Down Expand Up @@ -81,6 +83,7 @@ group :test do
if ENV['RAILS_VERSION'].nil? || ENV['RAILS_VERSION'] >= '6.0.0'
gem 'zeitwerk', :require => false
end
gem 'concurrent-ruby'
end

group :rubocop do
Expand Down
61 changes: 61 additions & 0 deletions lib/active_job/queue_adapters/delayed_job_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module ActiveJob
module QueueAdapters
# Explicitly remove the implementation existing in older rails'.
remove_const(:DelayedJobAdapter) if defined?(:DelayedJobAdapter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the adapter signature has changed over time, I think this file needs to be the fallback for when it is not present in ActiveJob.


# = Delayed Job adapter for Active Job
#
# To use Delayed Job, set the queue_adapter config to +:delayed_job+.
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter < ::ActiveJob::QueueAdapters::AbstractAdapter
def initialize(enqueue_after_transaction_commit: false)
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
end

def enqueue_after_transaction_commit? # :nodoc:
@enqueue_after_transaction_commit
end

def enqueue(job)
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end

def enqueue_at(job, timestamp)
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end

class JobWrapper
attr_accessor :job_data

def initialize(job_data)
@job_data = job_data
end

def display_name
base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})"

return base_name unless log_arguments?

"#{base_name} with arguments: #{job_data["arguments"]}"
end

def perform
Base.execute(job_data)
end

private
def log_arguments?
job_data["job_class"].constantize.log_arguments?
rescue NameError
false
end
end
end
end
end

1 change: 1 addition & 0 deletions lib/delayed/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'delayed_job'
require 'rails'
require 'active_job/queue_adapters/delayed_job_adapter'

module Delayed
class Railtie < Rails::Railtie
Expand Down
121 changes: 121 additions & 0 deletions spec/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
require 'helper'
require 'active_job'
require 'concurrent'

describe 'a Rails active job backend' do
module JobBuffer
@values = Concurrent::Array.new

class << self
def clear
@values.clear
end

def add(value)
@values << value
end

def values
@values.dup
end
end
end

class TestJob < ActiveJob::Base
queue_as :integration_tests

def perform(message)
JobBuffer.add(message)
end
end

let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w[integration_tests]) }

before do
JobBuffer.clear
Delayed::Job.delete_all
ActiveJob::Base.queue_adapter = :delayed_job
ActiveJob::Base.logger = nil
end

it "should supply a wrapped class name to DelayedJob" do
TestJob.perform_later
job = Delayed::Job.all.last
expect(job.name).to match(/TestJob \[[0-9a-f-]+\] from DelayedJob\(integration_tests\) with arguments: \[\]/)
end

it 'enqueus and executes the job' do
start_worker do
TestJob.perform_later('Rails')
sleep 2
expect(JobBuffer.values).to eq(['Rails'])
end
end

it "should not run jobs queued on a non-listening queue" do
start_worker do
old_queue = TestJob.queue_name

begin
TestJob.queue_as :some_other_queue
TestJob.perform_later "Rails"
sleep 2
expect(JobBuffer.values.empty?).to eq true
ensure
TestJob.queue_name = old_queue
end
end
end

it 'runs multiple queued jobs' do
start_worker do
ActiveJob.perform_all_later(TestJob.new('Rails'), TestJob.new('World'))
sleep 2
expect(JobBuffer.values).to eq(['Rails', 'World'])
end
end

it 'should not run job enqueued in the future' do
start_worker do
TestJob.set(wait: 5.seconds).perform_later('Rails')
sleep 2
expect(JobBuffer.values.empty?).to eq true
end
end

it 'should run job enqueued in the future at the specified time' do
start_worker do
TestJob.set(wait: 5.seconds).perform_later('Rails')
sleep 10
expect(JobBuffer.values).to eq(['Rails'])
end
end

it "should run job bulk enqueued in the future at the specified time" do
start_worker do
ActiveJob.perform_all_later([TestJob.new("Rails").set(wait: 5.seconds)])
sleep 10
expect(JobBuffer.values).to eq(['Rails'])
end
end

it "should run job with higher priority first" do
start_worker do
wait_until = Time.now + 3.seconds
TestJob.set(wait_until: wait_until, priority: 20).perform_later "1"
TestJob.set(wait_until: wait_until, priority: 10).perform_later "2"
sleep 10

expect(JobBuffer.values).to eq(['2', '1'])
end
end

private
def start_worker(&)
thread = Thread.new { worker.start }
yield
ensure
worker.stop
thread.join
end
end
Loading