diff --git a/src/lavinmq/bool_channel.cr b/src/lavinmq/bool_channel.cr index 7796a48fe8..dda203f698 100644 --- a/src/lavinmq/bool_channel.cr +++ b/src/lavinmq/bool_channel.cr @@ -1,46 +1,69 @@ # A class which has two channels, one for which a state is true # and the other for when the state if false class BoolChannel - getter when_true = Channel(Nil).new - getter when_false = Channel(Nil).new - @value = Channel(Bool).new + class StateChannel < Channel(Nil) + class EndlessQueue < Deque(Nil) + property? empty : Bool = true + + def shift(&) + yield if empty? + end + + def push(value : T) + raise "Can't push to an EndlessQueue" + end + end + + @queue = EndlessQueue.new + + def activate + @lock.sync do + @queue.as(EndlessQueue).empty = false + while receiver_ptr = dequeue_receiver + receiver_ptr.value.data = nil + receiver_ptr.value.state = DeliveryState::Delivered + receiver_ptr.value.fiber.enqueue + end + end + end + + def deactivate + @lock.sync do + @queue.as(EndlessQueue).empty = true + end + end + end + + getter when_true = StateChannel.new + getter when_false = StateChannel.new + @value : Atomic(Bool) def initialize(value : Bool) - spawn(name: "BoolChannel#send_loop") do - send_loop(value) + @value = Atomic.new(value) + if value + @when_true.activate + else + @when_false.activate end end - def set(value : Bool) - @value.send value - rescue ::Channel::ClosedError - Log.debug { "BoolChannel closed, could not set value" } + def value + @value.get end - def value : Bool - select - when when_true.receive - true - when when_false.receive - false + def set(value : Bool) + return if @value.swap(value) == value + if value + @when_false.deactivate + @when_true.activate + else + @when_true.deactivate + @when_false.activate end end def close @when_true.close @when_false.close - @value.close - end - - private def send_loop(value) - loop do - channel = value ? @when_true : @when_false - select - when value = @value.receive - when channel.send nil - Fiber.yield # Improves fairness by allowing the receiving fiber to run instead of notifying all waiting fibers - end - end - rescue ::Channel::ClosedError end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 48e9eaeaf2..f8ffdfacbf 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -9,6 +9,7 @@ require "./data_dir_lock" require "./etcd" require "./clustering/controller" require "../stdlib/openssl_sni" +require "../stdlib/fiber_stats" module LavinMQ struct StandaloneRunner @@ -46,6 +47,7 @@ module LavinMQ Log.warn { "The file descriptor limit is very low, consider raising it." } Log.warn { "You need one for each connection and two for each durable queue, and some more." } end + setup_buffer_pools Dir.mkdir_p @config.data_dir if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir) @@ -103,6 +105,14 @@ module LavinMQ @runner.stop end + private def setup_buffer_pools + buffer_size = @config.socket_buffer_size + if buffer_size.positive? + IO::BufferPool.setup(buffer_size) + Log.info { "Socket buffer pool enabled, buffer size: #{buffer_size}" } + end + end + private def print_ascii_logo logo = <<-LOGO @@ -242,7 +252,40 @@ module LavinMQ puts " reclaimed bytes during last GC: #{ps.bytes_reclaimed_since_gc.humanize_bytes}" puts " reclaimed bytes before last GC: #{ps.reclaimed_bytes_before_gc.humanize_bytes}" puts "Fibers:" - Fiber.list { |f| puts f.inspect } + total_stack = 0_u64 + read_loop_count = 0 + read_loop_stack_total = 0_u64 + read_loop_stack_min = UInt64::MAX + read_loop_stack_max = 0_u64 + Fiber.list do |f| + stack = f.stack_used + total_stack += stack + if f.name.try(&.starts_with?("Client#read_loop")) + read_loop_count += 1 + read_loop_stack_total += stack + read_loop_stack_min = stack if stack < read_loop_stack_min + read_loop_stack_max = stack if stack > read_loop_stack_max + end + puts " #{f.name}: #{stack.humanize_bytes}" + end + puts "Fiber stack summary:" + puts " total stack used: #{total_stack.humanize_bytes}" + if read_loop_count > 0 + avg = read_loop_stack_total // read_loop_count + puts " read_loop fibers: #{read_loop_count}" + puts " read_loop stack min: #{read_loop_stack_min.humanize_bytes}" + puts " read_loop stack max: #{read_loop_stack_max.humanize_bytes}" + puts " read_loop stack avg: #{avg.humanize_bytes}" + puts " read_loop stack total: #{read_loop_stack_total.humanize_bytes}" + end + if stats = IO::BufferPool.pool.try(&.stats) + puts "Buffer pool" + puts " buffer size: #{stats[:buffer_size].humanize_bytes}" + puts " available: #{stats[:available]}" + puts " allocated: #{stats[:allocated]}" + puts " reused: #{stats[:reused]}" + puts " released: #{stats[:released]}" + end STDOUT.flush end diff --git a/src/stdlib/fiber_stats.cr b/src/stdlib/fiber_stats.cr new file mode 100644 index 0000000000..eb8a59a088 --- /dev/null +++ b/src/stdlib/fiber_stats.cr @@ -0,0 +1,9 @@ +class Fiber + # Expose stack usage for debugging + # Stack grows downward: bottom is highest address, top is current position + def stack_used : UInt64 + # When fiber is suspended, context.stack_top has the saved stack pointer + # bottom - top = bytes used + @stack.bottom.address - @context.stack_top.address + end +end diff --git a/src/stdlib/io_buffered.cr b/src/stdlib/io_buffered.cr new file mode 100644 index 0000000000..ce905ff90a --- /dev/null +++ b/src/stdlib/io_buffered.cr @@ -0,0 +1,194 @@ +require "socket" + +# A thread-safe pool of reusable byte buffers for IO operations. +# This reduces memory usage for connections with many idle sockets by +# releasing buffers back to the pool when not actively in use. +class IO::BufferPool + @buffers = Deque(Pointer(UInt8)).new + @mutex = Mutex.new + @buffer_size : Int32 + @max_pool_size : Int32 + @stats_allocated = Atomic(Int64).new(0) + @stats_reused = Atomic(Int64).new(0) + @stats_released = Atomic(Int64).new(0) + + getter buffer_size + + def initialize(@buffer_size : Int32, @max_pool_size : Int32 = 100_000) + end + + # Acquire a buffer from the pool, or allocate a new one if the pool is empty + def acquire : Pointer(UInt8) + buffer = @mutex.synchronize { @buffers.shift? } + if buffer + @stats_reused.add(1) + buffer + else + @stats_allocated.add(1) + GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*) + end + end + + # Release a buffer back to the pool for reuse + # If the pool is at capacity, the buffer is discarded (GC will collect it) + def release(buffer : Pointer(UInt8)) : Nil + return if buffer.null? + @stats_released.add(1) + @mutex.synchronize do + if @buffers.size < @max_pool_size + @buffers.push(buffer) + end + end + end + + # Number of buffers currently available in the pool + def available : Int32 + @mutex.synchronize { @buffers.size } + end + + # Statistics about buffer usage + def stats + { + available: available, + allocated: @stats_allocated.get, + reused: @stats_reused.get, + released: @stats_released.get, + buffer_size: @buffer_size, + } + end + + # Singleton pool instance + class_property pool : IO::BufferPool? + + def self.setup(buffer_size : Int32, max_pool_size : Int32 = 100_000) : Nil + @@pool = IO::BufferPool.new(buffer_size, max_pool_size) + end +end + +# Override IO::Buffered to support buffer pooling for socket connections. +# This saves memory for idle connections by releasing buffers back to a pool +# when operations complete, allowing them to be reused by active connections. +# +# The key insight is that most connections are idle most of the time, waiting +# for data. By releasing buffers when: +# - Read buffer: all buffered data has been consumed +# - Write buffer: data has been flushed +# +# We can dramatically reduce memory usage for scenarios with many connections. +module IO::Buffered + def self.buffer_pool_stats + IO::BufferPool.pool.try &.stats + end + + # Override fill_buffer to only acquire buffer after data is available. + # For sockets, we release any existing buffer and wait for readability first, + # then acquire the buffer and perform the actual read. + # This ensures idle connections don't hold read buffers. + private def fill_buffer + pool = IO::BufferPool.pool + use_pool = pool && @buffer_size == pool.buffer_size + + # For sockets: release buffer and wait for data before acquiring + # This keeps idle connections from holding buffers + if use_pool && self.is_a?(Socket) + # Release existing buffer before waiting + if (p = pool) && (in_buf = @in_buffer) && !in_buf.null? + p.release(in_buf) + @in_buffer = Pointer(UInt8).null + end + Crystal::EventLoop.current.wait_readable(self) + end + + in_buf = if use_pool && (p = pool) + @in_buffer ||= p.acquire + else + @in_buffer ||= GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*) + end + size = unbuffered_read(Slice.new(in_buf, @buffer_size)).to_i + @in_buffer_rem = Slice.new(in_buf, size) + end + + # Override read to release buffer back to pool when all data consumed + def read(slice : Bytes) : Int32 + check_open + + count = slice.size + return 0 if count == 0 + + if @in_buffer_rem.empty? + # If we are asked to read more than half the buffer's size, + # read directly into the slice, as it's not worth the extra + # memory copy. + if !read_buffering? || count >= @buffer_size // 2 + return unbuffered_read(slice[0, count]).to_i + else + fill_buffer + return 0 if @in_buffer_rem.empty? + end + end + + to_read = Math.min(count, @in_buffer_rem.size) + slice.copy_from(@in_buffer_rem.to_unsafe, to_read) + @in_buffer_rem += to_read + + # Release buffer back to pool when all data consumed + if @in_buffer_rem.empty? + pool = IO::BufferPool.pool + if pool && @buffer_size == pool.buffer_size + if in_buf = @in_buffer + pool.release(in_buf) + @in_buffer = Pointer(UInt8).null + end + end + end + + to_read + end + + # Override flush to release write buffer after flushing + def flush : self + if @out_count > 0 + unbuffered_write(Slice.new(out_buffer, @out_count)) + end + unbuffered_flush + @out_count = 0 + + # Release write buffer back to pool after flush + pool = IO::BufferPool.pool + if pool && @buffer_size == pool.buffer_size + if out_buf = @out_buffer + pool.release(out_buf) + @out_buffer = Pointer(UInt8).null + end + end + + self + end + + # Override out_buffer to acquire from pool + private def out_buffer + pool = IO::BufferPool.pool + @out_buffer ||= if pool && @buffer_size == pool.buffer_size + pool.acquire + else + GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*) + end + end + + # Override close to release buffers back to pool + def close : Nil + # Release read buffer before close (write buffer released via flush) + pool = IO::BufferPool.pool + if pool && @buffer_size == pool.buffer_size + if in_buf = @in_buffer + pool.release(in_buf) + @in_buffer = Pointer(UInt8).null + @in_buffer_rem = Bytes.empty + end + end + + flush if @out_count > 0 + ensure + unbuffered_close + end +end