diff --git a/CHANGELOG.md b/CHANGELOG.md index ecef783d59..414f5cb316 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Fixed +- A delayed exchange didn't deliver messages on time [#1600](https://github.com/cloudamqp/lavinmq/pull/1600) + ## [2.7.0-alpha.1] - 2025-12-07 This release introduces the ability to restart closed queues, enhanced TLS capabilities including SNI and mTLS support, improved stream performance, improved clustering and federation management, and various performance optimizations. diff --git a/spec/delayed_message_exchange_spec.cr b/spec/delayed_message_exchange_spec.cr index 5bf512a75d..444cc3a552 100644 --- a/spec/delayed_message_exchange_spec.cr +++ b/spec/delayed_message_exchange_spec.cr @@ -104,6 +104,44 @@ describe "Delayed Message Exchange" do end end + it "should deliver based on when message is to expire", tags: "slow" do + with_amqp_server do |s| + with_channel(s) do |ch| + x = ch.exchange(x_name, "topic", args: x_args) + q = ch.queue(q_name) + q.bind(x.name, "#") + # Publish three message with delay 9000ms, 6000ms, 3000ms + 3.downto(1) do |i| + delay = i * 3000 + hdrs = AMQP::Client::Arguments.new({"x-delay" => delay}) + x.publish_confirm delay.to_s, "rk", props: AMQP::Client::Properties.new(headers: hdrs) + Fiber.yield + end + # by sleeping 5 seconds the message with delay 3000ms should be published + sleep 5.seconds + # publish another message, with a delay low enough to make the message + # being published before at least the one with 9000ms + hdrs = AMQP::Client::Arguments.new({"x-delay" => 1500}) + x.publish_confirm "1500", "rk", props: AMQP::Client::Properties.new(headers: hdrs) + Fiber.yield + # by sleeping another 2 seconds we've slept for 7s in total, meaning that + # the message published with 6000ms should be published. Also, the new message + # with 1500ms should be published + sleep 2.seconds + queue = s.vhosts["/"].queues[q_name] + queue.message_count.should eq 3 + sleep 3.seconds # total 10, the 9000ms message should have been published + queue.message_count.should eq 4 + expected = %w[3000 6000 1500 9000] + expected.each do |expected_delay| + queue.basic_get(no_ack: true) do |env| + String.new(env.message.body).should eq expected_delay + end + end + end + end + end + it "should support x-delayed-message as exchange type" do with_amqp_server do |s| with_channel(s) do |ch| diff --git a/spec/delayed_requeued_store_spec.cr b/spec/delayed_requeued_store_spec.cr new file mode 100644 index 0000000000..f20c12a79e --- /dev/null +++ b/spec/delayed_requeued_store_spec.cr @@ -0,0 +1,110 @@ +require "./spec_helper" + +describe LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore do + describe "#shift?" do + it "should return nil when empty" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + store.shift?.should be_nil + end + + it "should return and remove first element" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + timestamp = 1000i64 + store.insert(sp1, timestamp) + store.insert(sp2, timestamp) + store.shift?.should eq sp1 + store.shift?.should eq sp2 + store.shift?.should be_nil + end + end + + describe "#first?" do + it "should return nil when empty" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + store.first?.should be_nil + end + + it "should return first element without removing" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + timestamp = 1000i64 + store.insert(sp1, timestamp) + store.insert(sp2, timestamp) + store.first?.should eq sp1 + store.first?.should eq sp1 + end + end + + describe "#insert" do + it "should raise error for basic insert" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + expect_raises(Exception, /BUG:/) do + store.insert(sp) + end + end + + it "should maintain sorted order by expiration time" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 100u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32, delay: 50u32) + sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32, delay: 200u32) + timestamp = 1000i64 + store.insert(sp1, timestamp) + store.insert(sp2, timestamp) + store.insert(sp3, timestamp) + store.shift?.should eq sp2 + store.shift?.should eq sp1 + store.shift?.should eq sp3 + end + + it "should order by segment position when expiration times are equal" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32, delay: 100u32) + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 100u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32, delay: 100u32) + timestamp = 1000i64 + store.insert(sp3, timestamp) + store.insert(sp1, timestamp) + store.insert(sp2, timestamp) + store.shift?.should eq sp1 + store.shift?.should eq sp2 + store.shift?.should eq sp3 + end + end + + describe "#time_to_next_expiration?" do + it "should return nil when empty" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + store.time_to_next_expiration?.should be_nil + end + + it "should return time span to next expiration" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32, delay: 1000u32) + timestamp = RoughTime.unix_ms + store.insert(sp, timestamp) + time_to_expiration = store.time_to_next_expiration? + time_to_expiration.should_not be_nil + time_to_expiration.not_nil!.should be_close(1000.milliseconds, 100.milliseconds) + end + end + + describe "#clear" do + it "should empty the store" do + store = LavinMQ::AMQP::DelayedExchangeQueue::DelayedMessageStore::DelayedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + timestamp = 1000i64 + store.insert(sp1, timestamp) + store.insert(sp2, timestamp) + store.clear + store.first?.should be_nil + store.shift?.should be_nil + store.time_to_next_expiration?.should be_nil + end + end +end diff --git a/spec/requeued_store_spec.cr b/spec/requeued_store_spec.cr new file mode 100644 index 0000000000..54e2c1ccdc --- /dev/null +++ b/spec/requeued_store_spec.cr @@ -0,0 +1,80 @@ +require "spec" +require "../src/lavinmq/message_store/requeued_store" + +describe LavinMQ::MessageStore::PublishOrderedRequeuedStore do + describe "#shift?" do + it "should return nil when empty" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + store.shift?.should be_nil + end + + it "should return and remove first element" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + store.insert(sp1) + store.insert(sp2) + store.shift?.should eq sp1 + store.shift?.should eq sp2 + store.shift?.should be_nil + end + end + + describe "#first?" do + it "should return nil when empty" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + store.first?.should be_nil + end + + it "should return first element without removing" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + store.insert(sp1) + store.insert(sp2) + store.first?.should eq sp1 + store.first?.should eq sp1 + end + end + + describe "#insert" do + it "should maintain sorted order by segment position" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + sp3 = LavinMQ::SegmentPosition.new(1u32, 3u32, 10u32) + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + store.insert(sp3) + store.insert(sp1) + store.insert(sp2) + store.shift?.should eq sp1 + store.shift?.should eq sp2 + store.shift?.should eq sp3 + end + + it "should handle multiple segments" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + sp2_1 = LavinMQ::SegmentPosition.new(2u32, 1u32, 10u32) + sp1_2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + sp1_1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + store.insert(sp2_1) + store.insert(sp1_2) + store.insert(sp1_1) + store.shift?.should eq sp1_1 + store.shift?.should eq sp1_2 + store.shift?.should eq sp2_1 + end + end + + describe "#clear" do + it "should empty the store" do + store = LavinMQ::MessageStore::PublishOrderedRequeuedStore.new + sp1 = LavinMQ::SegmentPosition.new(1u32, 1u32, 10u32) + sp2 = LavinMQ::SegmentPosition.new(1u32, 2u32, 10u32) + store.insert(sp1) + store.insert(sp2) + store.clear + store.first?.should be_nil + store.shift?.should be_nil + end + end +end diff --git a/src/lavinmq/amqp/queue/delayed_exchange_queue.cr b/src/lavinmq/amqp/queue/delayed_exchange_queue.cr index b7459a2c31..d4a1f70564 100644 --- a/src/lavinmq/amqp/queue/delayed_exchange_queue.cr +++ b/src/lavinmq/amqp/queue/delayed_exchange_queue.cr @@ -1,6 +1,9 @@ require "./queue" +require "./delayed_exchange_queue/delayed_message_store" module LavinMQ::AMQP + # This class is only used by delayed exchanges. It can't niehter should be + # consumed from or published to by clients. class DelayedExchangeQueue < Queue MAX_NAME_LENGTH = 256 @@ -38,16 +41,13 @@ module LavinMQ::AMQP @exchange_name = arguments["x-dead-letter-exchange"]?.try(&.to_s) || raise "Missing x-dead-letter-exchange" end - def publish(message : Message) : Bool - false - end - def delay(msg : Message) : Bool return false if @deleted || @state.closed? @msg_store_lock.synchronize do @msg_store.push(msg) end @publish_count.add(1, :relaxed) + @message_ttl_change.send nil true rescue ex : MessageStore::Error @log.error(ex) { "Queue closed due to error" } @@ -55,33 +55,61 @@ module LavinMQ::AMQP raise ex end + # Overload to use our own store private def init_msg_store(data_dir) replicator = durable? ? @vhost.@replicator : nil DelayedMessageStore.new(data_dir, replicator, durable?, metadata: @metadata) end - private def expire_at(msg : BytesMessage) : Int64? - msg.timestamp + (msg.delay || 0u32) - end - - # internal queues can't expire so make this noop - private def queue_expire_loop - end - # simplify the message expire loop, as this queue can't have consumers or message-ttl private def message_expire_loop loop do if ttl = time_to_message_expiration + if ttl <= Time::Span::ZERO + expire_messages + next + end select - when @msg_store.empty.when_true.receive # there's a new "first" message + when @msg_store.empty.when_true.receive # purge? + when @message_ttl_change.receive when timeout ttl expire_messages end else - @msg_store.empty.when_false.receive + select + when @message_ttl_change.receive + when @msg_store.empty.when_false.receive + end end end rescue ::Channel::ClosedError + ensure + @log.debug { "message_expire_loop stopped" } + end + + def expire_messages + @msg_store_lock.synchronize do + loop do + env = @msg_store.first? || break + if has_expired?(env) + env = @msg_store.shift? || break + expire_msg(env, :expired) + else + break + end + end + end + end + + private def has_expired?(env : Envelope) : Bool + delay = env.segment_position.delay + timestamp = env.message.timestamp + expire_at = timestamp + delay + expire_at <= RoughTime.unix_ms + end + + private def time_to_message_expiration : Time::Span? + @msg_store.as(DelayedMessageStore).time_to_next_expiration? end # Overload to not ruin DLX header @@ -98,63 +126,33 @@ module LavinMQ::AMQP delete_message sp end - class DelayedMessageStore < MessageStore - def initialize(*args, **kwargs) - super - order_messages - end + # Disable a lot of inherited functionality (ugly) - def order_messages - sps = Array(SegmentPosition).new(@size) - while env = shift? - sps << env.segment_position - end - sps.each { |sp| requeue sp } - end + # We don't support any policies + private def apply_policy_argument(key : String, value : JSON::Any) : Bool + false + end - def push(msg) : SegmentPosition - sp = super - # make sure that we don't read from disk, only from requeued - @rfile_id = @wfile_id - @rfile = @wfile - @rfile.seek(0, IO::Seek::End) - # order messages by priority in the requeue dequeue - idx = @requeued.bsearch_index do |rsp| - if rsp.delay == sp.delay - rsp > sp - else - rsp.delay > sp.delay - end - end - if idx - @requeued.insert(idx, sp) - if idx.zero? - @empty.set false - end - else - @requeued.push(sp) - end - sp - end + # internal queues can't expire so make this noop + private def queue_expire_loop + end - def requeue(sp : SegmentPosition) - idx = @requeued.bsearch_index do |rsp| - if rsp.delay == sp.delay - rsp > sp - else - rsp.delay > sp.delay - end - end - if idx - @requeued.insert(idx, sp) - else - @requeued.push(sp) - end - was_empty = @size.zero? - @bytesize += sp.bytesize - @size += 1 - @empty.set false if was_empty - end + def publish(message : Message) : Bool + # This queue should never be published too + false + end + + def basic_get(no_ack, force = false, & : Envelope -> Nil) : Bool + # noop, not supported + false + end + + def ack(sp : SegmentPosition) : Nil + # noop, not supported + end + + def reject(sp : SegmentPosition) : Nil + # noop, not supported end end diff --git a/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.cr b/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.cr new file mode 100644 index 0000000000..fb7bfe8ef9 --- /dev/null +++ b/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_message_store.cr @@ -0,0 +1,86 @@ +require "../../../message_store" +require "./delayed_requeued_store" + +module LavinMQ::AMQP + class DelayedExchangeQueue < Queue + # A delayed exchange queue must have its messages order by "expire at" + # so the message expire loop will look at the right message. To acheive this + # messages are always added to a custom requeued store. This requeued store + # acts as a inmemory index where messages are ordered based on when they + # should be published. + # The reason why the requeued store is used, is that #shift and #first? will + # look for any requeued messages first, then read the next from disk. For a + # delayed exchange queue we never want to read messages in the order they + # arrived (was written to disk). + class DelayedMessageStore < MessageStore + # Redefine @requeued (defined in MessageStore) + @requeued : RequeuedStore = DelayedRequeuedStore.new + + private def requeued : DelayedRequeuedStore + @requeued.as(DelayedRequeuedStore) + end + + # Customization used by DelayedExchangeQueue + def time_to_next_expiration? : Time::Span? + requeued.time_to_next_expiration? + end + + def initialize(*args, **kwargs) + super + build_index + end + + def build_index + # Unfortunately we have to read all messages and build an "index" + while env = shift? + requeued.insert(env.segment_position, env.message.timestamp) + end + # We don't have to reset any pointer when we've read through all messages + # since we're always using the requeued index. + end + + def first? : Envelope? + raise ClosedError.new if @closed + sp = @requeued.first? || return + seg = @segments[sp.segment] + begin + msg = BytesMessage.from_bytes(seg.to_slice + sp.position) + Envelope.new(sp, msg, redelivered: true) + rescue ex + raise MessageStore::Error.new(seg, cause: ex) + end + end + + def shift?(consumer = nil) : Envelope? + raise ClosedError.new if @closed + sp = @requeued.shift? || return + segment = @segments[sp.segment] + begin + msg = BytesMessage.from_bytes(segment.to_slice + sp.position) + @bytesize -= sp.bytesize + @size -= 1 + @empty.set true if @size.zero? + Envelope.new(sp, msg, redelivered: true) + rescue ex + raise MessageStore::Error.new(segment, cause: ex) + end + end + + # Overload to add the segment position to our "index" + def push(msg) : SegmentPosition + raise ClosedError.new if @closed + was_empty = @size.zero? + sp = write_to_disk(msg) + requeued.insert(sp, msg.timestamp) + @bytesize += sp.bytesize + @size += 1 + @empty.set false if was_empty + sp + end + + def requeue(sp : SegmentPosition) + raise "BUG: messages should never be requeued to DelayedMessageStore" + end + end + end +end diff --git a/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_requeued_store.cr b/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_requeued_store.cr new file mode 100644 index 0000000000..50ed898fd0 --- /dev/null +++ b/src/lavinmq/amqp/queue/delayed_exchange_queue/delayed_requeued_store.cr @@ -0,0 +1,54 @@ +require "../../../message_store/requeued_store" + +module LavinMQ::AMQP + class DelayedExchangeQueue < Queue + class DelayedMessageStore < MessageStore + class DelayedRequeuedStore < MessageStore::RequeuedStore + record DelayedSegmentPosition, + sp : SegmentPosition, + expire_at : Int64 + + @segment_positions = Deque(DelayedSegmentPosition).new + + def shift? : SegmentPosition? + @segment_positions.shift?.try &.sp + end + + def first? : SegmentPosition? + @segment_positions.first?.try &.sp + end + + def time_to_next_expiration? : Time::Span? + sp = @segment_positions.first? + return if sp.nil? + (sp.expire_at - RoughTime.unix_ms).milliseconds + end + + def insert(sp : SegmentPosition) : Nil + raise "BUG: this insert overload should not be called" + end + + def insert(sp : SegmentPosition, timestamp : Int64) : Nil + sp = DelayedSegmentPosition.new(sp, timestamp + sp.delay) + idx = @segment_positions.bsearch_index do |rsp| + if rsp.expire_at == sp.expire_at + rsp.sp > sp.sp + else + rsp.expire_at > sp.expire_at + end + end + + if idx + @segment_positions.insert(idx, sp) + else + @segment_positions.push(sp) + end + end + + def clear : Nil + @segment_positions = Deque(DelayedSegmentPosition).new + end + end + end + end +end diff --git a/src/lavinmq/message_store.cr b/src/lavinmq/message_store.cr index 928facc424..fefa8438d2 100644 --- a/src/lavinmq/message_store.cr +++ b/src/lavinmq/message_store.cr @@ -4,6 +4,7 @@ require "log" require "file_utils" require "./clustering/server" require "./bool_channel" +require "./message_store/requeued_store" module LavinMQ # Message store @@ -20,7 +21,7 @@ module LavinMQ @segments = Hash(UInt32, MFile).new @deleted = Hash(UInt32, Array(UInt32)).new @segment_msg_count = Hash(UInt32, UInt32).new(0u32) - @requeued = Deque(SegmentPosition).new + @requeued : RequeuedStore = PublishOrderedRequeuedStore.new @closed = false getter closed getter bytesize = 0u64 @@ -55,12 +56,8 @@ module LavinMQ def requeue(sp : SegmentPosition) raise ClosedError.new if @closed - if idx = @requeued.bsearch_index { |rsp| rsp > sp } - @requeued.insert(idx, sp) - else - @requeued.push(sp) - end was_empty = @size.zero? + @requeued.insert(sp) @bytesize += sp.bytesize @size += 1 @empty.set false if was_empty @@ -223,7 +220,7 @@ module LavinMQ delete(env.segment_position) end - @requeued = Deque(SegmentPosition).new + @requeued.clear @bytesize = 0_u64 end diff --git a/src/lavinmq/message_store/requeued_store.cr b/src/lavinmq/message_store/requeued_store.cr new file mode 100644 index 0000000000..2b5cc49f01 --- /dev/null +++ b/src/lavinmq/message_store/requeued_store.cr @@ -0,0 +1,37 @@ +require "deque" +require "../segment_position" + +module LavinMQ + class MessageStore + abstract class RequeuedStore + abstract def shift? : SegmentPosition? + abstract def first? : SegmentPosition? + abstract def insert(sp : SegmentPosition) : Nil + abstract def clear : Nil + end + + class PublishOrderedRequeuedStore < RequeuedStore + @segment_positions = Deque(SegmentPosition).new + + def shift? : SegmentPosition? + @segment_positions.shift? + end + + def first? : SegmentPosition? + @segment_positions.first? + end + + def insert(sp : SegmentPosition) : Nil + if idx = @segment_positions.bsearch_index { |rsp| rsp > sp } + @segment_positions.insert(idx, sp) + else + @segment_positions.push(sp) + end + end + + def clear : Nil + @segment_positions = Deque(SegmentPosition).new + end + end + end +end