Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
with:
bundler: 2
ruby-version: ${{ matrix.ruby_version }}

# Appraisal doesn't support vendored install
# See: https://github.com/thoughtbot/appraisal/issues/173
# https://github.com/thoughtbot/appraisal/pull/174
Expand Down
28 changes: 16 additions & 12 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
source "https://rubygems.org"
source 'https://rubygems.org'

gem "appraisal"
gem "bundler", "> 1.5"
gem "pry"
gem "rake"
gem "rspec"
gem "rspec-sidekiq"
gem "rubocop"
gem "rubocop-rspec"
gem "simplecov"
gem "timecop"
gem "activesupport"
# Specify your gem's dependencies in sidekiq-grouping.gemspec
gemspec

group :development do
gem "appraisal"
gem "bundler", "> 1.5"
gem "pry"
gem "rake"
gem "rspec"
gem "rspec-sidekiq"
gem "rubocop"
gem "rubocop-rspec"
gem "simplecov"
gem "timecop"
end
19 changes: 10 additions & 9 deletions lib/sidekiq/grouping/flusher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ def flush_batches(batches)
def flush_concrete(batches)
return if batches.empty?

names = batches.map do |batch|
"#{batch.worker_class} in #{batch.queue}"
end
unless Sidekiq::Grouping::Config.tests_env
Sidekiq::Grouping.logger.info(
"[Sidekiq::Grouping] Trying to flush batched queues: " \
"#{names.join(',')}"
)
end
# Comment out logging code since it can be noisy in production
# names = batches.map do |batch|
# "#{batch.worker_class} in #{batch.queue}"
# end
# unless Sidekiq::Grouping::Config.tests_env
# Sidekiq::Grouping.logger.info(
# "[Sidekiq::Grouping] Trying to flush batched queues: " \
# "#{names.join(',')}"
# )
# end
batches.each(&:flush)
end
end
Expand Down
42 changes: 39 additions & 3 deletions lib/sidekiq/grouping/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,25 @@ module Grouping
class Redis
include RedisDispatcher

PLUCK_SCRIPT = <<-SCRIPT
BREAK_VERSION = "6.2.0"

PLUCK_SCRIPT_GTE_6_2_0 = <<-SCRIPT
local pluck_values = redis.call('lpop', KEYS[1], ARGV[1]) or {}
if #pluck_values > 0 then
redis.call('srem', KEYS[2], unpack(pluck_values))
end
return pluck_values
SCRIPT

PLUCK_SCRIPT_LT_6_2_0 = <<-SCRIPT
local pluck_values = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1)
redis.call('ltrim', KEYS[1], ARGV[1], -1)
for k, v in pairs(pluck_values) do
redis.call('srem', KEYS[2], v)
end
return pluck_values
SCRIPT

def push_msg(name, msg, remember_unique: false)
redis do |conn|
conn.multi do |pipeline|
Expand Down Expand Up @@ -50,7 +61,7 @@ def pluck(name, limit)
if new_redis_client?
redis_call(
:eval,
PLUCK_SCRIPT,
pluck_script,
2,
ns(name),
unique_messages_key(name),
Expand All @@ -59,7 +70,7 @@ def pluck(name, limit)
else
keys = [ns(name), unique_messages_key(name)]
args = [limit]
redis_call(:eval, PLUCK_SCRIPT, keys, args)
redis_call(:eval, pluck_script, keys, args)
end
end

Expand Down Expand Up @@ -100,6 +111,31 @@ def unique_messages_key(name)
def ns(key = nil)
"batching:#{key}"
end

#
# Get Redis server version
#
# @return [String] Redis server version
#
def server_version
Sidekiq.redis do |conn|
conn.info["redis_version"]
end
end

#
# The optimized LUA SCRIPT works from Redis greater than or equal to 6.2.
# Check Redis version in use and return the suitable PLUCK_SCRIPT
#
# @return [String] Lua Script
#
def pluck_script
if Gem::Version.new(server_version) >= Gem::Version.new(BREAK_VERSION)
PLUCK_SCRIPT_GTE_6_2_0
else
PLUCK_SCRIPT_LT_6_2_0
end
end
end
end
end
22 changes: 22 additions & 0 deletions spec/modules/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,26 @@
expect(redis_call(:smembers, unique_key)).to eq []
end
end

describe "#pluck_script" do
context "when Redis server version is" do
it ">= 6.2.0, selects the corresponding pluck script" do
allow_any_instance_of(described_class)
.to receive(:server_version)
.and_return("6.2.0")
expect(redis_service.send(:pluck_script)).to eq(
described_class::PLUCK_SCRIPT_GTE_6_2_0
)
end

it "< 6.2.0, selects the corresponding pluck script" do
allow_any_instance_of(described_class)
.to receive(:server_version)
.and_return("6.0.0")
expect(redis_service.send(:pluck_script)).to eq(
described_class::PLUCK_SCRIPT_LT_6_2_0
)
end
end
end
end