Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
- A delayed exchange didn't deliver messages on time [#1600](https://github.com/cloudamqp/lavinmq/pull/1600)

## [2.7.0-alpha.1] - 2025-12-07

This release introduces the ability to restart closed queues, enhanced TLS capabilities including SNI and mTLS support, improved stream performance, improved clustering and federation management, and various performance optimizations.
Expand Down
38 changes: 38 additions & 0 deletions spec/delayed_message_exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,44 @@ describe "Delayed Message Exchange" do
end
end

it "should deliver based on when message is to expire", tags: "slow" do
with_amqp_server do |s|
with_channel(s) do |ch|
x = ch.exchange(x_name, "topic", args: x_args)
q = ch.queue(q_name)
q.bind(x.name, "#")
# Publish three message with delay 9000ms, 6000ms, 3000ms
3.downto(1) do |i|
delay = i * 3000
hdrs = AMQP::Client::Arguments.new({"x-delay" => delay})
x.publish_confirm delay.to_s, "rk", props: AMQP::Client::Properties.new(headers: hdrs)
Fiber.yield
end
# by sleeping 5 seconds the message with delay 3000ms should be published
sleep 5.seconds
# publish another message, with a delay low enough to make the message
# being published before at least the one with 9000ms
hdrs = AMQP::Client::Arguments.new({"x-delay" => 1500})
x.publish_confirm "1500", "rk", props: AMQP::Client::Properties.new(headers: hdrs)
Fiber.yield
# by sleeping another 2 seconds we've slept for 7s in total, meaning that
# the message published with 6000ms should be published. Also, the new message
# with 1500ms should be published
sleep 2.seconds
queue = s.vhosts["/"].queues[q_name]
queue.message_count.should eq 3
sleep 3.seconds # total 10, the 9000ms message should have been published
queue.message_count.should eq 4
expected = %w[3000 6000 1500 9000]
expected.each do |expected_delay|
queue.basic_get(no_ack: true) do |env|
String.new(env.message.body).should eq expected_delay
end
end
end
end
end

it "should support x-delayed-message as exchange type" do
with_amqp_server do |s|
with_channel(s) do |ch|
Expand Down
110 changes: 110 additions & 0 deletions spec/delayed_requeued_store_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require "./spec_helper"

describe LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore do
describe "#shift?" do
it "should return nil when empty" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
store.shift?.should be_nil
end

it "should return and remove first element" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
timestamp = 1000i64
store.insert(sp1, timestamp)
store.insert(sp2, timestamp)
store.shift?.should eq sp1
store.shift?.should eq sp2
store.shift?.should be_nil
end
end

describe "#first?" do
it "should return nil when empty" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
store.first?.should be_nil
end

it "should return first element without removing" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
timestamp = 1000i64
store.insert(sp1, timestamp)
store.insert(sp2, timestamp)
store.first?.should eq sp1
store.first?.should eq sp1
end
end

describe "#insert" do
it "should raise error for basic insert" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
expect_raises(Exception, /BUG:/) do
store.insert(sp)
end
end

it "should maintain sorted order by expiration time" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 100u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32, delay: 50u32)
sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32, delay: 200u32)
timestamp = 1000i64
store.insert(sp1, timestamp)
store.insert(sp2, timestamp)
store.insert(sp3, timestamp)
store.shift?.should eq sp2
store.shift?.should eq sp1
store.shift?.should eq sp3
end

it "should order by segment position when expiration times are equal" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32, delay: 100u32)
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 100u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32, delay: 100u32)
timestamp = 1000i64
store.insert(sp3, timestamp)
store.insert(sp1, timestamp)
store.insert(sp2, timestamp)
store.shift?.should eq sp1
store.shift?.should eq sp2
store.shift?.should eq sp3
end
end

describe "#time_to_next_expiration?" do
it "should return nil when empty" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
store.time_to_next_expiration?.should be_nil
end

it "should return time span to next expiration" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 1000u32)
timestamp = RoughTime.unix_ms
store.insert(sp, timestamp)
time_to_expiration = store.time_to_next_expiration?
time_to_expiration.should_not be_nil
time_to_expiration.not_nil!.should be_close(1000.milliseconds, 100.milliseconds)
end
end

describe "#clear" do
it "should empty the store" do
store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
timestamp = 1000i64
store.insert(sp1, timestamp)
store.insert(sp2, timestamp)
store.clear
store.first?.should be_nil
store.shift?.should be_nil
store.time_to_next_expiration?.should be_nil
end
end
end
80 changes: 80 additions & 0 deletions spec/requeued_store_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require "spec"
require "../src/lavinmq/message_store/requeued_store"

describe LavinMQ::MessageStore::PublishOrderedRequeuedStore do
describe "#shift?" do
it "should return nil when empty" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
store.shift?.should be_nil
end

it "should return and remove first element" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
store.insert(sp1)
store.insert(sp2)
store.shift?.should eq sp1
store.shift?.should eq sp2
store.shift?.should be_nil
end
end

describe "#first?" do
it "should return nil when empty" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
store.first?.should be_nil
end

it "should return first element without removing" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
store.insert(sp1)
store.insert(sp2)
store.first?.should eq sp1
store.first?.should eq sp1
end
end

describe "#insert" do
it "should maintain sorted order by segment position" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32)
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
store.insert(sp3)
store.insert(sp1)
store.insert(sp2)
store.shift?.should eq sp1
store.shift?.should eq sp2
store.shift?.should eq sp3
end

it "should handle multiple segments" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
sp2_1 = LavinMQ::SegmentPosition.new(2u32, 1u32, 10u32)
sp1_2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
sp1_1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
store.insert(sp2_1)
store.insert(sp1_2)
store.insert(sp1_1)
store.shift?.should eq sp1_1
store.shift?.should eq sp1_2
store.shift?.should eq sp2_1
end
end

describe "#clear" do
it "should empty the store" do
store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new
sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32)
sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32)
store.insert(sp1)
store.insert(sp2)
store.clear
store.first?.should be_nil
store.shift?.should be_nil
end
end
end
Loading
Loading