diff --git a/spec/shovel_spec.cr b/spec/shovel_spec.cr index 3697d5b2fc..5454973444 100644 --- a/spec/shovel_spec.cr +++ b/spec/shovel_spec.cr @@ -12,6 +12,34 @@ module ShovelSpecHelpers end end +class FailingDestination < LavinMQ::Shovel::Destination + getter push_attempts : Int32 = 0 + + def initialize(@wrapped : LavinMQ::Shovel::Destination, @fail_count : Int32) + end + + def start + @wrapped.start + end + + def stop + @wrapped.stop + end + + def started? : Bool + @wrapped.started? + end + + def push(msg, source) : Bool + @push_attempts += 1 + if @push_attempts <= @fail_count + false + else + @wrapped.push(msg, source) + end + end +end + describe LavinMQ::Shovel do describe "AMQP" do describe "Source" do @@ -800,4 +828,84 @@ describe LavinMQ::Shovel do end end end + + describe "push retry logic" do + describe "reject" do + it "should requeue message to source queue" do + with_amqp_server do |s| + source = LavinMQ::Shovel::AMQPSource.new( + "spec", + [URI.parse(s.amqp_url)], + "reject_q1", + prefetch: 1_u16, + direct_user: s.users.direct_user + ) + + with_channel(s) do |ch| + ch.queue("reject_q1") + x = ch.exchange("", "direct", passive: true) + x.publish_confirm "test message", "reject_q1" + + s.vhosts["/"].queues["reject_q1"].message_count.should eq 1 + + source.start + delivery_tag : UInt64 = 0 + wg = WaitGroup.new(1) + spawn do + source.each do |msg| + delivery_tag = msg.delivery_tag + wg.done + end + rescue + # Source was stopped + end + wg.wait + + # Message should be unacked (not visible in message_count) + s.vhosts["/"].queues["reject_q1"].message_count.should eq 0 + + # Reject the message - it should be requeued + source.reject(delivery_tag) + source.stop + + # Message should be back in the queue + wait_for { s.vhosts["/"].queues["reject_q1"].message_count == 1 } + s.vhosts["/"].queues["reject_q1"].message_count.should eq 1 + end + end + end + end + + it "should retry push and succeed after transient failures" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + source = LavinMQ::Shovel::AMQPSource.new( + "spec", + [URI.parse(s.amqp_url)], + "retry_q1", + delete_after: LavinMQ::Shovel::DeleteAfter::QueueLength, + direct_user: s.users.direct_user + ) + real_dest = LavinMQ::Shovel::AMQPDestination.new( + "spec", + URI.parse(s.amqp_url), + "retry_q2", + direct_user: s.users.direct_user + ) + # Fail 3 times, then succeed + failing_dest = FailingDestination.new(real_dest, 3) + shovel = LavinMQ::Shovel::Runner.new(source, failing_dest, "retry_shovel", vhost) + + with_channel(s) do |ch| + x, q2 = ShovelSpecHelpers.setup_qs ch, "retry_" + x.publish_confirm "shovel me", "retry_q1" + shovel.run + # Message should arrive at destination after retries + q2.get(no_ack: true).try(&.body_io.to_s).should eq "shovel me" + # Should have tried 4 times (3 failures + 1 success) + failing_dest.push_attempts.should eq 4 + end + end + end + end end diff --git a/src/lavinmq/shovel/shovel.cr b/src/lavinmq/shovel/shovel.cr index c5dba00e6d..1628d3a3c3 100644 --- a/src/lavinmq/shovel/shovel.cr +++ b/src/lavinmq/shovel/shovel.cr @@ -93,6 +93,13 @@ module LavinMQ end end + def reject(delivery_tag) + if ch = @ch + return if ch.closed? + ch.basic_reject(delivery_tag, requeue: true) + end + end + def started? : Bool !@q.nil? && !@conn.try &.closed? end @@ -183,7 +190,7 @@ module LavinMQ abstract def stop - abstract def push(msg, source) + abstract def push(msg, source) : Bool abstract def started? : Bool end @@ -207,8 +214,12 @@ module LavinMQ @current_dest = nil end - def push(msg, source) - @current_dest.try &.push(msg, source) + def push(msg, source) : Bool + if dest = @current_dest + dest.push(msg, source) + else + false + end end def started? : Bool @@ -272,21 +283,26 @@ module LavinMQ !@ch.nil? && !@conn.try &.closed? end - def push(msg, source) + def push(msg, source) : Bool raise "Not started" unless started? ch = @ch.not_nil! ex = @exchange || msg.exchange rk = @exchange_key || msg.routing_key case @ack_mode in AckMode::OnConfirm - ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) do + if ch.basic_publish_confirm(msg.body_io, ex, rk, props: msg.properties) source.ack(msg.delivery_tag) + true + else + false end in AckMode::OnPublish ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) source.ack(msg.delivery_tag) + true in AckMode::NoAck ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) + true end end end @@ -314,7 +330,7 @@ module LavinMQ !@client.nil? end - def push(msg, source) + def push(msg, source) : Bool raise "Not started" unless started? c = @client.not_nil! headers = ::HTTP::Headers{"User-Agent" => "LavinMQ"} @@ -341,6 +357,7 @@ module LavinMQ source.ack(msg.delivery_tag) in AckMode::NoAck end + true end end @@ -368,7 +385,7 @@ module LavinMQ @state end - def run + def run # ameba:disable Metrics/CyclomaticComplexity Log.context.set(name: @name, vhost: @vhost.name) loop do break if should_stop_loop? @@ -382,7 +399,15 @@ module LavinMQ @retries = 0 @source.each do |msg| @message_count += 1 - @destination.push(msg, @source) + push_retries = 0 + until @destination.push(msg, @source) + if push_retries >= 5 + @source.reject(msg.delivery_tag) + break + end + push_retries += 1 + sleep 500.milliseconds + end end break if should_stop_loop? # Don't delete shovel if paused/terminated @vhost.delete_parameter("shovel", @name) if @source.delete_after.queue_length?