Skip to content

Conversation

@snichme
Copy link
Member

@snichme snichme commented Dec 10, 2025

WHAT is this pull request doing?

Fixes #1357

Checks return value of publish_confirm and if was not publish retries 5 times before rejecting the message.

HOW can this pull request be tested?

There is a script attached to the issue, run that before and after applying this fix.
Before, q1 is just emptied, messages disappears. After this fix the messages are kept in the queue.

@snichme snichme requested a review from a team as a code owner December 10, 2025 12:17
@claude
Copy link

claude bot commented Dec 10, 2025

Claude finished @snichme's task —— View job


Code Review Complete

Todo List:

  • Read and analyze changed files
  • Review retry logic implementation
  • Check for bugs and anti-patterns
  • Report findings

Problems Found:

1. Race Condition Risk in MultiDestinationHandler
File: src/lavinmq/shovel/shovel.cr:217-223

The push method has a potential race condition. If @current_dest becomes nil between the check and the method call, it could cause issues:

def push(msg, source) : Bool
  if dest = @current_dest
    dest.push(msg, source)
  else
    false
  end
end

2. Inconsistent Error Handling in HTTPDestination
File: src/lavinmq/shovel/shovel.cr:333-362

The push method doesn't check the HTTP response status in all cases. It can raise FailedDeliveryError for OnConfirm/OnPublish modes but always returns true at the end, which could mask failures:

case @ack_mode
in AckMode::OnConfirm, AckMode::OnPublish
  raise FailedDeliveryError.new unless response.success?
  source.ack(msg.delivery_tag)
in AckMode::NoAck
end
true  # This always returns true even if response.success? failed above

3. Missing Error Handling for Reject Operation
File: src/lavinmq/shovel/shovel.cr:404-407

The retry loop doesn't handle potential exceptions when calling @source.reject(). If the reject operation fails, it could leave messages in an inconsistent state.

4. Hard-coded Retry Parameters
File: src/lavinmq/shovel/shovel.cr:404-407

The retry count (5) and sleep duration (500ms) are hard-coded, making them difficult to configure for different use cases or environments.


Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered a delivery-limit policy? or at least look at the x-delivery-count header?

case @ack_mode
in AckMode::OnConfirm
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) do
if ch.basic_publish_confirm(msg.body_io, ex, rk, props: msg.properties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the block to basic_publish takes a boolean too, the result from the confirm (ack/nack).

Suggested change
if ch.basic_publish_confirm(msg.body_io, ex, rk, props: msg.properties)
ch.basic_publish(msg.body_io, ex, rk, props: msg.properties) do |ok|

The upside/downside with this approach is that this fiber is blocked until an ack/nack is recievied from the server, hampering the throughput, but also limits the number of messages not confirmed (to 1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could reject in the block instead, with requeue if the x-delivery-count is less than 5, and without requeue if > 5. Or even use the delivery-limit queue argument/policy somehow, instead of this hardcoded one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using deliver limit. But then again, the message will be removed from the queue and "disappear" if there is no DLX configured.

What I wanted to achieve with this PR is that the shovel basically just waits until the destination is available again, as a default behavior.

push_retries = 0
until @destination.push(msg, @source)
if push_retries >= 5
@source.reject(msg.delivery_tag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the message rejected with requeue? that means the message will be retried soon again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to try 5 times and then put the message back in the queue so it could be removed or consumed by another consumer.
Don't want to reject without requeue since the message will disappear if no DLX is configured and the issue related to this PR #1357 was just about disappearing messages.

@snichme snichme requested a review from carlhoerberg January 8, 2026 09:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Overflow mode reject-publish not respected by shovel

3 participants