11# frozen_string_literal: true
22
3+ require_relative "./redis_dispatcher"
4+
35module Sidekiq
46 module Grouping
57 class Redis
8+ include RedisDispatcher
9+
610 PLUCK_SCRIPT = <<-SCRIPT
711 local pluck_values = redis.call('lpop', KEYS[1], ARGV[1]) or {}
812 if #pluck_values > 0 then
@@ -15,61 +19,75 @@ def push_msg(name, msg, remember_unique: false)
1519 redis do |conn |
1620 conn . multi do |pipeline |
1721 sadd = pipeline . respond_to? ( :sadd? ) ? :sadd? : :sadd
18- pipeline . public_send ( sadd , ns ( "batches" ) , name )
19- pipeline . rpush ( ns ( name ) , msg )
22+ redis_connection_call ( pipeline , sadd , ns ( "batches" ) , name )
23+ redis_connection_call ( pipeline , :rpush , ns ( name ) , msg )
24+
2025 if remember_unique
21- pipeline . public_send (
22- sadd ,
23- unique_messages_key ( name ) ,
24- msg
26+ redis_connection_call (
27+ pipeline , sadd , unique_messages_key ( name ) , msg
2528 )
2629 end
2730 end
2831 end
2932 end
3033
3134 def enqueued? ( name , msg )
32- redis do |conn |
33- conn . sismember ( unique_messages_key ( name ) , msg )
34- end
35+ member = redis_call ( :sismember , unique_messages_key ( name ) , msg )
36+ return member if member . is_a? ( TrueClass ) || member . is_a? ( FalseClass )
37+
38+ member != 0
3539 end
3640
3741 def batch_size ( name )
38- redis { | conn | conn . llen ( ns ( name ) ) }
42+ redis_call ( :llen , ns ( name ) )
3943 end
4044
4145 def batches
42- redis { | conn | conn . smembers ( ns ( "batches" ) ) }
46+ redis_call ( :smembers , ns ( "batches" ) )
4347 end
4448
4549 def pluck ( name , limit )
46- keys = [ ns ( name ) , unique_messages_key ( name ) ]
47- args = [ limit ]
48- redis { |conn | conn . eval PLUCK_SCRIPT , keys , args }
50+ if new_redis_client?
51+ redis_call (
52+ :eval ,
53+ PLUCK_SCRIPT ,
54+ 2 ,
55+ ns ( name ) ,
56+ unique_messages_key ( name ) ,
57+ limit
58+ )
59+ else
60+ keys = [ ns ( name ) , unique_messages_key ( name ) ]
61+ args = [ limit ]
62+ redis_call ( :eval , PLUCK_SCRIPT , keys , args )
63+ end
4964 end
5065
5166 def get_last_execution_time ( name )
52- redis { | conn | conn . get ( ns ( "last_execution_time:#{ name } " ) ) }
67+ redis_call ( :get , ns ( "last_execution_time:#{ name } " ) )
5368 end
5469
5570 def set_last_execution_time ( name , time )
56- redis do | conn |
57- conn . set ( ns ( "last_execution_time:#{ name } " ) , time . to_json )
58- end
71+ redis_call (
72+ : set, ns ( "last_execution_time:#{ name } " ) , time . to_json
73+ )
5974 end
6075
6176 def lock ( name )
62- redis do |conn |
63- id = ns ( "lock:#{ name } " )
64- conn . set ( id , true , nx : true , ex : Sidekiq ::Grouping ::Config . lock_ttl )
65- end
77+ redis_call (
78+ :set ,
79+ ns ( "lock:#{ name } " ) ,
80+ "true" ,
81+ nx : true ,
82+ ex : Sidekiq ::Grouping ::Config . lock_ttl
83+ )
6684 end
6785
6886 def delete ( name )
6987 redis do |conn |
70- conn . del ( ns ( "last_execution_time:#{ name } " ) )
71- conn . del ( ns ( name ) )
72- conn . srem ( ns ( "batches" ) , name )
88+ redis_connection_call ( conn , : del, ns ( "last_execution_time:#{ name } " ) )
89+ redis_connection_call ( conn , : del, ns ( name ) )
90+ redis_connection_call ( conn , : srem, ns ( "batches" ) , name )
7391 end
7492 end
7593
@@ -82,10 +100,6 @@ def unique_messages_key(name)
82100 def ns ( key = nil )
83101 "batching:#{ key } "
84102 end
85-
86- def redis ( &block )
87- Sidekiq . redis ( &block )
88- end
89103 end
90104 end
91105end
0 commit comments