Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/lavinmq/amqp/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,23 @@ module LavinMQ
@client.flush
end

def compact_collections
# Deque uses dup
@unacked = @unacked.dup if @unacked.capacity > @unacked.size * 2

# Array uses trim_to_size
@tx_publishes.trim_to_size
@tx_acks.trim_to_size
@consumers.trim_to_size

# IO::Memory - reset if empty
@next_msg_body_tmp = IO::Memory.new if @next_msg_body_tmp.size == 0 && @next_msg_body_tmp.capacity > 0

# Clear temporary sets (they should be empty between publishes)
@visited.clear if @visited.size > 0
@found_queues.clear if @found_queues.size > 0
end

class ClosedError < Error; end
end
end
Expand Down
6 changes: 6 additions & 0 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,12 @@ module LavinMQ
@socket.flush
end
end

def compact_collections
# Hash uses dup
@channels = @channels.dup if @channels.capacity > @channels.size * 2
@channels.each_value(&.compact_collections)
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/exchange/consistent_hash.cr
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ module LavinMQ
else raise LavinMQ::Error::PreconditionFailed.new("Routing header must be string")
end
end

def compact_collections
# Set uses dup
@bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/exchange/direct.cr
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ module LavinMQ
yield destination
end
end

def compact_collections
# Hash uses dup
@bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/exchange/fanout.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ module LavinMQ
yield destination
end
end

def compact_collections
# Set uses dup
@bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/exchange/headers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ module LavinMQ
end
end
end

def compact_collections
# Hash uses dup
@bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/amqp/exchange/topic.cr
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ module LavinMQ
end
end
end

def compact_collections
# Hash uses dup
@bindings = @bindings.dup if @bindings.capacity > @bindings.size * 2
end
end
end
end
13 changes: 13 additions & 0 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,19 @@ module LavinMQ::AMQP
false
end

def compact_collections
# Hash uses dup
@deliveries = @deliveries.dup if @deliveries.capacity > @deliveries.size * 2

# Array uses trim_to_size
@consumers.trim_to_size

# Deque uses dup
@basic_get_unacked = @basic_get_unacked.dup if @basic_get_unacked.capacity > @basic_get_unacked.size * 2

@msg_store.compact_collections
end

class Error < Exception; end

class ReadError < Exception; end
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/auth/user_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ module LavinMQ
File.rename tmpfile, path
@replicator.try &.replace_file path
end

def compact_collections
# Hash uses dup
@users = @users.dup if @users.capacity > @users.size * 2
end
end
end
end
6 changes: 6 additions & 0 deletions src/lavinmq/federation/upstream_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ module LavinMQ
@upstreams.each_value &.close
@upstream_sets.values.flatten.each &.close
end

def compact_collections
# Hash uses dup
@upstreams = @upstreams.dup if @upstreams.capacity > @upstreams.size * 2
@upstream_sets = @upstream_sets.dup if @upstream_sets.capacity > @upstream_sets.size * 2
end
end
end
end
11 changes: 10 additions & 1 deletion src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ module LavinMQ
@tls_context = create_tls_context if @config.tls_configured?
reload_tls_context
setup_signal_traps
SystemD::MemoryPressure.monitor { GC.collect }
SystemD::MemoryPressure.monitor do
Log.info { "Memory pressure detected, running compaction" }
started_at = Time.monotonic

@amqp_server.try(&.compact_collections)
GC.collect

elapsed = Time.monotonic - started_at
Log.info { "Memory pressure handled in #{elapsed.total_milliseconds}ms" }
end
end

private def start : self
Expand Down
13 changes: 13 additions & 0 deletions src/lavinmq/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,19 @@ module LavinMQ
end
end

def compact_collections
# Hash uses dup
@deleted = @deleted.dup if @deleted.capacity > @deleted.size * 2
@segments = @segments.dup if @segments.capacity > @segments.size * 2

# Deque uses dup
@requeued = @requeued.dup if @requeued.capacity > @requeued.size * 2

# Tell kernel it can release physical pages backing mmapped files
@segments.each_value(&.dontneed)
@acks.try &.each_value(&.dontneed)
end

class ClosedError < ::Channel::ClosedError; end

class MetadataError < Exception; end
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ module LavinMQ
end
end

def compact_collections
# Hash uses dup
@clients = @clients.dup if @clients.capacity > @clients.size * 2
end

def close
@retain_store.close
end
Expand Down
6 changes: 6 additions & 0 deletions src/lavinmq/mqtt/brokers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ module LavinMQ
@brokers[vhost].close
end
end

def compact_collections
# Hash uses dup
@brokers = @brokers.dup if @brokers.capacity > @brokers.size * 2
@brokers.each_value(&.compact_collections)
end
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ module LavinMQ
next_id
end

def compact_collections
# Hash uses dup
@unacked = @unacked.dup if @unacked.capacity > @unacked.size * 2
end

private def handle_arguments
super
@effective_args << "x-queue-type"
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/parameter_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,10 @@ module LavinMQ
@log.error(exception: ex) { "Failed to load #{@file_name}" }
raise ex
end

def compact_collections
# Hash uses dup
@parameters = @parameters.dup if @parameters.capacity > @parameters.size * 2
end
end
end
8 changes: 8 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -500,5 +500,13 @@ module LavinMQ
def uptime
Time.monotonic - @start
end

def compact_collections
@users.compact_collections
@parameters.compact_collections
@mqtt_brokers.compact_collections
@vhosts.compact_collections
Fiber.yield
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/shovel/shovel_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,10 @@ module LavinMQ
end
Shovel::MultiDestinationHandler.new(destinations)
end

def compact_collections
# Hash uses dup
@shovels = @shovels.dup if @shovels.capacity > @shovels.size * 2
end
end
end
17 changes: 17 additions & 0 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -698,5 +698,22 @@ module LavinMQ
LibC.sync
{% end %}
end

def compact_collections
# Array uses trim_to_size monkey patch
@connections.trim_to_size

# Hash uses dup
@direct_reply_consumers = @direct_reply_consumers.dup if @direct_reply_consumers.capacity > @direct_reply_consumers.size * 2
@exchanges = @exchanges.dup if @exchanges.capacity > @exchanges.size * 2
@queues = @queues.dup if @queues.capacity > @queues.size * 2

@queues.each_value(&.compact_collections)
@exchanges.each_value(&.compact_collections)
@connections.each(&.compact_collections) # cascade to channels
@shovels.try(&.compact_collections)
@upstreams.try(&.compact_collections)
Fiber.yield
end
end
end
6 changes: 6 additions & 0 deletions src/lavinmq/vhost_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,11 @@ module LavinMQ
File.rename "#{path}.tmp", path
@replicator.try &.replace_file path
end

def compact_collections
# Hash uses dup
@vhosts = @vhosts.dup if @vhosts.capacity > @vhosts.size * 2
@vhosts.each_value(&.compact_collections)
end
end
end
24 changes: 24 additions & 0 deletions src/stdlib/array.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,28 @@ class Array(T)
def capacity
@capacity
end

private def rewind
root_buffer.copy_from(@buffer, @size)
shift_buffer_by -@offset_to_buffer
end

# Ensures that the internal buffer has at least `capacity` elements.
def ensure_capacity(capacity : Int32) : self
resize_to_capacity capacity if capacity >= @size
self
end

# Reduces the internal buffer to exactly fit the number of elements in the
# array, plus `extra` elements. Returns true if compaction occurred.
def trim_to_size(*, extra : Int32 = 0, threshold : Int32 = 2) : Bool
raise ArgumentError.new("Negative extra capacity: #{extra}") if extra < 0

# Only compact if capacity is significantly larger than size
return false if @capacity <= (@size + extra) * threshold

rewind
resize_to_capacity(@size + extra)
true
end
end
Loading