Skip to content

Conversation

@lukas8219
Copy link
Contributor

@lukas8219 lukas8219 commented Aug 18, 2025

WHAT is this pull request doing?

  1. Implements Retrier in HTTPDestination Shovel - configured by parameters
  • dest-backoff
  • dest-jitter
  • dest-timeout
  • dest-max-retries
  1. Fixes HTTPDestination not taking no_ack mode into consideration - as all HTTPDestinations were defaulted as OnConfirm/OnPublish.

HOW can this pull request be tested?

Specs:

  • Expose mock server to receive HTTP requests. Fail at least 2 times and accept third time.
  • Tests focused on the Retrier logic

RFC

  • Should we actually implement a proper Circuit Breaker instead of just Retrier?
  • Currently using ?.try is misleading. Is there a better way to handle it?

Closes #1137

@lukas8219 lukas8219 changed the title wip: naive implementation of push_with_retry feat(Shovels): implement retry + backoff in Shovel destinations Aug 19, 2025
Comment on lines 363 to 375
private def push_with_retry(path, headers, body, max_retries = 3, jitter = 0.5)
retries = 0
while retries < max_retries
response = @client.not_nil!.post(path, headers: headers, body: body)
return true if response.success?
retries += 1
# Exponential backoff: 2^retries seconds with jitter
base_delay = (2.0 ** retries).seconds
jitter_delay = jitter.seconds * Random.rand(0.0..1.0)
sleep base_delay + jitter_delay
end
false
end
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I think this could be a macro? trait? that receives the max_retries and jitter and executes a &block

Copy link
Member

@carlhoerberg carlhoerberg Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, like this:

Suggested change
private def push_with_retry(path, headers, body, max_retries = 3, jitter = 0.5)
retries = 0
while retries < max_retries
response = @client.not_nil!.post(path, headers: headers, body: body)
return true if response.success?
retries += 1
# Exponential backoff: 2^retries seconds with jitter
base_delay = (2.0 ** retries).seconds
jitter_delay = jitter.seconds * Random.rand(0.0..1.0)
sleep base_delay + jitter_delay
end
false
end
private def push_with_retry(path, headers, body, max_retries = 3, jitter = 0.5.seconds, &) : Bool
retries = 0
while retries < max_retries
return true if yield
retries += 1
# Exponential backoff: 2^retries seconds with jitter
base_delay = (2.0 ** retries).seconds
jitter_delay = jitter * rand(1.0)
sleep base_delay + jitter_delay
end
false
end

@kickster97
Copy link
Member

Have not looked closely at the code, but does this adress issue #1357 aswell?

@lukas8219
Copy link
Contributor Author

@kickster97 Not related. It was meant for only the HTTP retries

Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this class needs a lot of rework, for instance, what's up with @client not being assigned in the constructor? What would allow us to get rid of the not_nil! anti-pattern.

@carlhoerberg carlhoerberg marked this pull request as ready for review November 20, 2025 05:58
@carlhoerberg carlhoerberg requested a review from a team as a code owner November 20, 2025 05:58
"/"
end
response = c.post(path, headers: headers, body: msg.body_io)
success = push_with_retry(path, headers, msg.body_io, max_retries: 3, jitter: 0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push_with_retry makes sense for AckMode::OnConfirm and AckMode::OnPublish but NoAck is more like "fire-and-forget" so there we don't care about the results.

What I mean is that we should only retry on OnConfirm and OnPublish

@lukas8219 lukas8219 marked this pull request as draft November 28, 2025 16:43
require "./spec_helper"
require "../src/lavinmq/shovel/shovel_retrier"

describe LavinMQ::Shovel::Retrier do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It got me wondering wether Retrier shouldn't just be some sort of Circuit Breaker...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain some more what you are thinking here? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snichme If we are doing HTTP Retries, there's a high possibility that blindly retrying is not the best approach. What if the user has N HTTP shovels, and their downstreams start failing?

This behavior of blindly retrying could just make the situation worse...And it's not a good UX to simply have the user deleting/pausing shovels one by one? (on the other hand for that specific scenario, we could make it simple as having a Pause All sorta button)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha agree.
I like it. I would prefer to have a checkbox next to each shovel, so you can select some of them and have batch operations on the selected ones, like Pause, Delete etc.
Like we have with Queues and Streams.

But this should be another PR IMO. so this wont run away in size or time

name,
uri,
Shovel::HTTPDestinationParameters.from_parameters(config),
ack_mode
Copy link
Contributor Author

@lukas8219 lukas8219 Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 🔥 This might be a breaking change for some customers. I need to check deployed LMQ definitions and adding in release notes a comment about this. ACK mode was not being considered in HTTPDest

Comment on lines 331 to 334
jitter: parameters["dest-jitter"]?.try &.as_f? || 1.0,
backoff: parameters["dest-backoff"]?.try &.as_f? || 2.0,
timeout: parameters["dest-timeout"]?.try &.as_f? || 10.0,
max_retries: parameters["dest-max-retries"]?.try &.as_i? || 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 ⚠️ ?.try may fail to transform user input like 1.0 or 1 between Float and Integer. Not sure if ?.try is the best idea here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning, it could be misleading for the user if the Shovel Parameter accepts their input but behind the scenes, it uses the default values. Notice it via Spec

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reoccurring thing, how and when to validate input data and also handle it when it's invalid.
What you have here is common pattern that we try to parse the input and if it fails we use a default value, not saying it's the best since the shovel might end up being configured in a wrong way. But trying to find a common solution to all input is also difficult. @spuun did some refactoring on validating input for a queue here: https://github.com/cloudamqp/lavinmq/blob/main/src/lavinmq/amqp/queue/queue.cr#L27 might give some inspiration.

@lukas8219 lukas8219 marked this pull request as ready for review December 6, 2025 20:41
@lukas8219 lukas8219 requested a review from snichme December 6, 2025 20:42
@snichme
Copy link
Member

snichme commented Jan 8, 2026

Draft PR but still something that will impact this PR if merged before: #1591 I can help rebase if needed.

@lukas8219
Copy link
Contributor Author

@snichme Ok. I am still waiting on that PR to be merged so I can push this further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

4 participants