diff --git a/src/lavinmq/amqp/channel.cr b/src/lavinmq/amqp/channel.cr index f6390874ce..a36557463f 100644 --- a/src/lavinmq/amqp/channel.cr +++ b/src/lavinmq/amqp/channel.cr @@ -821,6 +821,23 @@ module LavinMQ @client.flush end + def compact_collections + # Deque uses dup + @unacked = @unacked.dup if @unacked.capacity > @unacked.size * 2 + + # Array uses trim_to_size + @tx_publishes.trim_to_size + @tx_acks.trim_to_size + @consumers.trim_to_size + + # IO::Memory - reset if empty + @next_msg_body_tmp = IO::Memory.new if @next_msg_body_tmp.size == 0 && @next_msg_body_tmp.capacity > 0 + + # Clear temporary sets (they should be empty between publishes) + @visited.clear if @visited.size > 0 + @found_queues.clear if @found_queues.size > 0 + end + class ClosedError < Error; end end end diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index 85987cf2d8..fbc0edb9c8 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -896,6 +896,12 @@ module LavinMQ @socket.flush end end + + def compact_collections + # Hash uses dup + @channels = @channels.dup if @channels.capacity > @channels.size * 2 + @channels.each_value(&.compact_collections) + end end end end diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index ef5210b15e..89ded428f2 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -67,6 +67,11 @@ module LavinMQ else raise LavinMQ::Error::PreconditionFailed.new("Routing header must be string") end end + + def compact_collections + # Set uses dup + @bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2 + end end end end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index b1c5fc9b32..79fa6f2b31 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -46,6 +46,11 @@ module LavinMQ yield destination end end + + def compact_collections + # Hash uses dup + @bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2 + end end end end diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index d5adac27cf..ea8619d09f 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -38,6 +38,11 @@ module LavinMQ yield destination end end + + def compact_collections + # Set uses dup + @bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2 + end end end end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index 73be2225cc..8ad60a98b9 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -84,6 +84,11 @@ module LavinMQ end end end + + def compact_collections + # Hash uses dup + @bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2 + end end end end diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 157b49168f..a897b8b565 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -172,6 +172,11 @@ module LavinMQ end end end + + def compact_collections + # Hash uses dup + @bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2 + end end end end diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index ff7ce52cc8..c58b576743 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -1027,6 +1027,19 @@ module LavinMQ::AMQP false end + def compact_collections + # Hash uses dup + @deliveries = @deliveries.dup if @deliveries.capacity > @deliveries.size * 2 + + # Array uses trim_to_size + @consumers.trim_to_size + + # Deque uses dup + @basic_get_unacked = @basic_get_unacked.dup if @basic_get_unacked.capacity > @basic_get_unacked.size * 2 + + @msg_store.compact_collections + end + class Error < Exception; end class ReadError < Exception; end diff --git a/src/lavinmq/auth/user_store.cr b/src/lavinmq/auth/user_store.cr index f05f3231e3..270f415823 100644 --- a/src/lavinmq/auth/user_store.cr +++ b/src/lavinmq/auth/user_store.cr @@ -149,6 +149,11 @@ module LavinMQ File.rename tmpfile, path @replicator.try &.replace_file path end + + def compact_collections + # Hash uses dup + @users = @users.dup if @users.capacity > @users.size * 2 + end end end end diff --git a/src/lavinmq/federation/upstream_store.cr b/src/lavinmq/federation/upstream_store.cr index 31c3eb9725..fb65726160 100644 --- a/src/lavinmq/federation/upstream_store.cr +++ b/src/lavinmq/federation/upstream_store.cr @@ -119,6 +119,12 @@ module LavinMQ @upstreams.each_value &.close @upstream_sets.values.flatten.each &.close end + + def compact_collections + # Hash uses dup + @upstreams = @upstreams.dup if @upstreams.capacity > @upstreams.size * 2 + @upstream_sets = @upstream_sets.dup if @upstream_sets.capacity > @upstream_sets.size * 2 + end end end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 96e53920ad..895287207f 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -59,7 +59,16 @@ module LavinMQ @tls_context = create_tls_context if @config.tls_configured? reload_tls_context setup_signal_traps - SystemD::MemoryPressure.monitor { GC.collect } + SystemD::MemoryPressure.monitor do + Log.info { "Memory pressure detected, running compaction" } + started_at = Time.monotonic + + @amqp_server.try(&.compact_collections) + GC.collect + + elapsed = Time.monotonic - started_at + Log.info { "Memory pressure handled in #{elapsed.total_milliseconds}ms" } + end end private def start : self diff --git a/src/lavinmq/message_store.cr b/src/lavinmq/message_store.cr index dda0ed4af6..3f5f10a031 100644 --- a/src/lavinmq/message_store.cr +++ b/src/lavinmq/message_store.cr @@ -585,6 +585,19 @@ module LavinMQ end end + def compact_collections + # Hash uses dup + @deleted = @deleted.dup if @deleted.capacity > @deleted.size * 2 + @segments = @segments.dup if @segments.capacity > @segments.size * 2 + + # Deque uses dup + @requeued = @requeued.dup if @requeued.capacity > @requeued.size * 2 + + # Tell kernel it can release physical pages backing mmapped files + @segments.each_value(&.dontneed) + @acks.try &.each_value(&.dontneed) + end + class ClosedError < ::Channel::ClosedError; end class MetadataError < Exception; end diff --git a/src/lavinmq/mqtt/broker.cr b/src/lavinmq/mqtt/broker.cr index 96ad576db7..c0b3699851 100644 --- a/src/lavinmq/mqtt/broker.cr +++ b/src/lavinmq/mqtt/broker.cr @@ -96,6 +96,11 @@ module LavinMQ end end + def compact_collections + # Hash uses dup + @clients = @clients.dup if @clients.capacity > @clients.size * 2 + end + def close @retain_store.close end diff --git a/src/lavinmq/mqtt/brokers.cr b/src/lavinmq/mqtt/brokers.cr index 78d9338abc..b387ae0374 100644 --- a/src/lavinmq/mqtt/brokers.cr +++ b/src/lavinmq/mqtt/brokers.cr @@ -32,6 +32,12 @@ module LavinMQ @brokers[vhost].close end end + + def compact_collections + # Hash uses dup + @brokers = @brokers.dup if @brokers.capacity > @brokers.size * 2 + @brokers.each_value(&.compact_collections) + end end end end diff --git a/src/lavinmq/mqtt/session.cr b/src/lavinmq/mqtt/session.cr index 022be1d8fd..c71349aa3b 100644 --- a/src/lavinmq/mqtt/session.cr +++ b/src/lavinmq/mqtt/session.cr @@ -208,6 +208,11 @@ module LavinMQ next_id end + def compact_collections + # Hash uses dup + @unacked = @unacked.dup if @unacked.capacity > @unacked.size * 2 + end + private def handle_arguments super @effective_args << "x-queue-type" diff --git a/src/lavinmq/parameter_store.cr b/src/lavinmq/parameter_store.cr index b48f2627f0..8b63e84690 100644 --- a/src/lavinmq/parameter_store.cr +++ b/src/lavinmq/parameter_store.cr @@ -80,5 +80,10 @@ module LavinMQ @log.error(exception: ex) { "Failed to load #{@file_name}" } raise ex end + + def compact_collections + # Hash uses dup + @parameters = @parameters.dup if @parameters.capacity > @parameters.size * 2 + end end end diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 742b6effe5..4d8fdc3fe6 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -500,5 +500,13 @@ module LavinMQ def uptime Time.monotonic - @start end + + def compact_collections + @users.compact_collections + @parameters.compact_collections + @mqtt_brokers.compact_collections + @vhosts.compact_collections + Fiber.yield + end end end diff --git a/src/lavinmq/shovel/shovel_store.cr b/src/lavinmq/shovel/shovel_store.cr index 341723a3bf..238e7bd717 100644 --- a/src/lavinmq/shovel/shovel_store.cr +++ b/src/lavinmq/shovel/shovel_store.cr @@ -65,5 +65,10 @@ module LavinMQ end Shovel::MultiDestinationHandler.new(destinations) end + + def compact_collections + # Hash uses dup + @shovels = @shovels.dup if @shovels.capacity > @shovels.size * 2 + end end end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 90a72503b0..74b10f6427 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -698,5 +698,22 @@ module LavinMQ LibC.sync {% end %} end + + def compact_collections + # Array uses trim_to_size monkey patch + @connections.trim_to_size + + # Hash uses dup + @direct_reply_consumers = @direct_reply_consumers.dup if @direct_reply_consumers.capacity > @direct_reply_consumers.size * 2 + @exchanges = @exchanges.dup if @exchanges.capacity > @exchanges.size * 2 + @queues = @queues.dup if @queues.capacity > @queues.size * 2 + + @queues.each_value(&.compact_collections) + @exchanges.each_value(&.compact_collections) + @connections.each(&.compact_collections) # cascade to channels + @shovels.try(&.compact_collections) + @upstreams.try(&.compact_collections) + Fiber.yield + end end end diff --git a/src/lavinmq/vhost_store.cr b/src/lavinmq/vhost_store.cr index d932835374..bf689020e2 100644 --- a/src/lavinmq/vhost_store.cr +++ b/src/lavinmq/vhost_store.cr @@ -103,5 +103,11 @@ module LavinMQ File.rename "#{path}.tmp", path @replicator.try &.replace_file path end + + def compact_collections + # Hash uses dup + @vhosts = @vhosts.dup if @vhosts.capacity > @vhosts.size * 2 + @vhosts.each_value(&.compact_collections) + end end end diff --git a/src/stdlib/array.cr b/src/stdlib/array.cr index fcdab0385f..7d3462e89d 100644 --- a/src/stdlib/array.cr +++ b/src/stdlib/array.cr @@ -2,4 +2,28 @@ class Array(T) def capacity @capacity end + + private def rewind + root_buffer.copy_from(@buffer, @size) + shift_buffer_by -@offset_to_buffer + end + + # Ensures that the internal buffer has at least `capacity` elements. + def ensure_capacity(capacity : Int32) : self + resize_to_capacity capacity if capacity >= @size + self + end + + # Reduces the internal buffer to exactly fit the number of elements in the + # array, plus `extra` elements. Returns true if compaction occurred. + def trim_to_size(*, extra : Int32 = 0, threshold : Int32 = 2) : Bool + raise ArgumentError.new("Negative extra capacity: #{extra}") if extra < 0 + + # Only compact if capacity is significantly larger than size + return false if @capacity <= (@size + extra) * threshold + + rewind + resize_to_capacity(@size + extra) + true + end end