Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions spec/shovel_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ module ShovelSpecHelpers
end
end

class FailingDestination < LavinMQ::Shovel::Destination
getter push_attempts : Int32 = 0

def initialize(@wrapped : LavinMQ::Shovel::Destination, @fail_count : Int32)
end

def start
@wrapped.start
end

def stop
@wrapped.stop
end

def started? : Bool
@wrapped.started?
end

def push(msg, source) : Bool
@push_attempts += 1
if @push_attempts <= @fail_count
false
else
@wrapped.push(msg, source)
end
end
end

describe LavinMQ::Shovel do
describe "AMQP" do
describe "Source" do
Expand Down Expand Up @@ -800,4 +828,84 @@ describe LavinMQ::Shovel do
end
end
end

describe "push retry logic" do
describe "reject" do
it "should requeue message to source queue" do
with_amqp_server do |s|
source = LavinMQ::Shovel::AMQPSource.new(
"spec",
[URI.parse(s.amqp_url)],
"reject_q1",
prefetch: 1_u16,
direct_user: s.users.direct_user
)

with_channel(s) do |ch|
ch.queue("reject_q1")
x = ch.exchange("", "direct", passive: true)
x.publish_confirm "test message", "reject_q1"

s.vhosts["/"].queues["reject_q1"].message_count.should eq 1

source.start
delivery_tag : UInt64 = 0
wg = WaitGroup.new(1)
spawn do
source.each do |msg|
delivery_tag = msg.delivery_tag
wg.done
end
rescue
# Source was stopped
end
wg.wait

# Message should be unacked (not visible in message_count)
s.vhosts["/"].queues["reject_q1"].message_count.should eq 0

# Reject the message - it should be requeued
source.reject(delivery_tag)
source.stop

# Message should be back in the queue
wait_for { s.vhosts["/"].queues["reject_q1"].message_count == 1 }
s.vhosts["/"].queues["reject_q1"].message_count.should eq 1
end
end
end
end

it "should retry push and succeed after transient failures" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
source = LavinMQ::Shovel::AMQPSource.new(
"spec",
[URI.parse(s.amqp_url)],
"retry_q1",
delete_after: LavinMQ::Shovel::DeleteAfter::QueueLength,
direct_user: s.users.direct_user
)
real_dest = LavinMQ::Shovel::AMQPDestination.new(
"spec",
URI.parse(s.amqp_url),
"retry_q2",
direct_user: s.users.direct_user
)
# Fail 3 times, then succeed
failing_dest = FailingDestination.new(real_dest, 3)
shovel = LavinMQ::Shovel::Runner.new(source, failing_dest, "retry_shovel", vhost)

with_channel(s) do |ch|
x, q2 = ShovelSpecHelpers.setup_qs ch, "retry_"
x.publish_confirm "shovel me", "retry_q1"
shovel.run
# Message should arrive at destination after retries
q2.get(no_ack: true).try(&.body_io.to_s).should eq "shovel me"
# Should have tried 4 times (3 failures + 1 success)
failing_dest.push_attempts.should eq 4
end
end
end
end
end
41 changes: 33 additions & 8 deletions src/lavinmq/shovel/shovel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ module LavinMQ
end
end

def reject(delivery_tag)
if ch = @ch
return if ch.closed?
ch.basic_reject(delivery_tag, requeue: true)
end
end

def started? : Bool
[email protected]? && [email protected] &.closed?
end
Expand Down Expand Up @@ -183,7 +190,7 @@ module LavinMQ

abstract def stop

abstract def push(msg, source)
abstract def push(msg, source) : Bool

abstract def started? : Bool
end
Expand All @@ -207,8 +214,12 @@ module LavinMQ
@current_dest = nil
end

def push(msg, source)
@current_dest.try &.push(msg, source)
def push(msg, source) : Bool
if dest = @current_dest
dest.push(msg, source)
else
false
end
end

def started? : Bool
Expand Down Expand Up @@ -272,21 +283,26 @@ module LavinMQ
[email protected]? && [email protected] &.closed?
end

def push(msg, source)
def push(msg, source) : Bool
raise "Not started" unless started?
ch = @ch.not_nil!
ex = @exchange || msg.exchange
rk = @exchange_key || msg.routing_key
case @ack_mode
in AckMode::OnConfirm
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) do
if ch.basic_publish_confirm(msg.body_io, ex, rk, props: msg.properties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the block to basic_publish takes a boolean too, the result from the confirm (ack/nack).

Suggested change
if ch.basic_publish_confirm(msg.body_io, ex, rk, props: msg.properties)
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) do |ok|

The upside/downside with this approach is that this fiber is blocked until an ack/nack is recievied from the server, hampering the throughput, but also limits the number of messages not confirmed (to 1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could reject in the block instead, with requeue if the x-delivery-count is less than 5, and without requeue if > 5. Or even use the delivery-limit queue argument/policy somehow, instead of this hardcoded one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using deliver limit. But then again, the message will be removed from the queue and "disappear" if there is no DLX configured.

What I wanted to achieve with this PR is that the shovel basically just waits until the destination is available again, as a default behavior.

source.ack(msg.delivery_tag)
true
else
false
end
in AckMode::OnPublish
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties)
source.ack(msg.delivery_tag)
true
in AckMode::NoAck
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties)
true
end
end
end
Expand Down Expand Up @@ -314,7 +330,7 @@ module LavinMQ
[email protected]?
end

def push(msg, source)
def push(msg, source) : Bool
raise "Not started" unless started?
c = @client.not_nil!
headers = ::HTTP::Headers{"User-Agent" => "LavinMQ"}
Expand All @@ -341,6 +357,7 @@ module LavinMQ
source.ack(msg.delivery_tag)
in AckMode::NoAck
end
true
end
end

Expand Down Expand Up @@ -368,7 +385,7 @@ module LavinMQ
@state
end

def run
def run # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(name: @name, vhost: @vhost.name)
loop do
break if should_stop_loop?
Expand All @@ -382,7 +399,15 @@ module LavinMQ
@retries = 0
@source.each do |msg|
@message_count += 1
@destination.push(msg, @source)
push_retries = 0
until @destination.push(msg, @source)
if push_retries >= 5
@source.reject(msg.delivery_tag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the message rejected with requeue? that means the message will be retried soon again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to try 5 times and then put the message back in the queue so it could be removed or consumed by another consumer.
Don't want to reject without requeue since the message will disappear if no DLX is configured and the issue related to this PR #1357 was just about disappearing messages.

break
end
push_retries += 1
sleep 500.milliseconds
end
end
break if should_stop_loop? # Don't delete shovel if paused/terminated
@vhost.delete_parameter("shovel", @name) if @source.delete_after.queue_length?
Expand Down
Loading