Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions src/lavinmq/bool_channel.cr
Original file line number Diff line number Diff line change
@@ -1,46 +1,69 @@
# A class which has two channels, one for which a state is true
# and the other for when the state if false
class BoolChannel
getter when_true = Channel(Nil).new
getter when_false = Channel(Nil).new
@value = Channel(Bool).new
class StateChannel < Channel(Nil)
class EndlessQueue < Deque(Nil)
property? empty : Bool = true

def shift(&)
yield if empty?
end

def push(value : T)
raise "Can't push to an EndlessQueue"
end
end

@queue = EndlessQueue.new

def activate
@lock.sync do
@queue.as(EndlessQueue).empty = false
while receiver_ptr = dequeue_receiver
receiver_ptr.value.data = nil
receiver_ptr.value.state = DeliveryState::Delivered
receiver_ptr.value.fiber.enqueue
end
end
end

def deactivate
@lock.sync do
@queue.as(EndlessQueue).empty = true
end
end
end

getter when_true = StateChannel.new
getter when_false = StateChannel.new
@value : Atomic(Bool)

def initialize(value : Bool)
spawn(name: "BoolChannel#send_loop") do
send_loop(value)
@value = Atomic.new(value)
if value
@when_true.activate
else
@when_false.activate
end
end

def set(value : Bool)
@value.send value
rescue ::Channel::ClosedError
Log.debug { "BoolChannel closed, could not set value" }
def value
@value.get
end

def value : Bool
select
when when_true.receive
true
when when_false.receive
false
def set(value : Bool)
return if @value.swap(value) == value
if value
@when_false.deactivate
@when_true.activate
else
@when_true.deactivate
@when_false.activate
end
end

def close
@when_true.close
@when_false.close
@value.close
end

private def send_loop(value)
loop do
channel = value ? @when_true : @when_false
select
when value = @value.receive
when channel.send nil
Fiber.yield # Improves fairness by allowing the receiving fiber to run instead of notifying all waiting fibers
end
end
rescue ::Channel::ClosedError
end
end
45 changes: 44 additions & 1 deletion src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require "./data_dir_lock"
require "./etcd"
require "./clustering/controller"
require "../stdlib/openssl_sni"
require "../stdlib/fiber_stats"

module LavinMQ
struct StandaloneRunner
Expand Down Expand Up @@ -46,6 +47,7 @@ module LavinMQ
Log.warn { "The file descriptor limit is very low, consider raising it." }
Log.warn { "You need one for each connection and two for each durable queue, and some more." }
end
setup_buffer_pools
Dir.mkdir_p @config.data_dir
if @config.data_dir_lock?
@data_dir_lock = DataDirLock.new(@config.data_dir)
Expand Down Expand Up @@ -103,6 +105,14 @@ module LavinMQ
@runner.stop
end

private def setup_buffer_pools
buffer_size = @config.socket_buffer_size
if buffer_size.positive?
IO::BufferPool.setup(buffer_size)
Log.info { "Socket buffer pool enabled, buffer size: #{buffer_size}" }
end
end

private def print_ascii_logo
logo = <<-LOGO

Expand Down Expand Up @@ -242,7 +252,40 @@ module LavinMQ
puts " reclaimed bytes during last GC: #{ps.bytes_reclaimed_since_gc.humanize_bytes}"
puts " reclaimed bytes before last GC: #{ps.reclaimed_bytes_before_gc.humanize_bytes}"
puts "Fibers:"
Fiber.list { |f| puts f.inspect }
total_stack = 0_u64
read_loop_count = 0
read_loop_stack_total = 0_u64
read_loop_stack_min = UInt64::MAX
read_loop_stack_max = 0_u64
Fiber.list do |f|
stack = f.stack_used
total_stack += stack
if f.name.try(&.starts_with?("Client#read_loop"))
read_loop_count += 1
read_loop_stack_total += stack
read_loop_stack_min = stack if stack < read_loop_stack_min
read_loop_stack_max = stack if stack > read_loop_stack_max
end
puts " #{f.name}: #{stack.humanize_bytes}"
end
puts "Fiber stack summary:"
puts " total stack used: #{total_stack.humanize_bytes}"
if read_loop_count > 0
avg = read_loop_stack_total // read_loop_count
puts " read_loop fibers: #{read_loop_count}"
puts " read_loop stack min: #{read_loop_stack_min.humanize_bytes}"
puts " read_loop stack max: #{read_loop_stack_max.humanize_bytes}"
puts " read_loop stack avg: #{avg.humanize_bytes}"
puts " read_loop stack total: #{read_loop_stack_total.humanize_bytes}"
end
if stats = IO::BufferPool.pool.try(&.stats)
puts "Buffer pool"
puts " buffer size: #{stats[:buffer_size].humanize_bytes}"
puts " available: #{stats[:available]}"
puts " allocated: #{stats[:allocated]}"
puts " reused: #{stats[:reused]}"
puts " released: #{stats[:released]}"
end
STDOUT.flush
end

Expand Down
9 changes: 9 additions & 0 deletions src/stdlib/fiber_stats.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Fiber
# Expose stack usage for debugging
# Stack grows downward: bottom is highest address, top is current position
def stack_used : UInt64
# When fiber is suspended, context.stack_top has the saved stack pointer
# bottom - top = bytes used
@stack.bottom.address - @context.stack_top.address
end
end
194 changes: 194 additions & 0 deletions src/stdlib/io_buffered.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
require "socket"

# A thread-safe pool of reusable byte buffers for IO operations.
# This reduces memory usage for connections with many idle sockets by
# releasing buffers back to the pool when not actively in use.
class IO::BufferPool
@buffers = Deque(Pointer(UInt8)).new
@mutex = Mutex.new
@buffer_size : Int32
@max_pool_size : Int32
@stats_allocated = Atomic(Int64).new(0)
@stats_reused = Atomic(Int64).new(0)
@stats_released = Atomic(Int64).new(0)

getter buffer_size

def initialize(@buffer_size : Int32, @max_pool_size : Int32 = 100_000)
end

# Acquire a buffer from the pool, or allocate a new one if the pool is empty
def acquire : Pointer(UInt8)
buffer = @mutex.synchronize { @buffers.shift? }
if buffer
@stats_reused.add(1)
buffer
else
@stats_allocated.add(1)
GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*)
end
end

# Release a buffer back to the pool for reuse
# If the pool is at capacity, the buffer is discarded (GC will collect it)
def release(buffer : Pointer(UInt8)) : Nil
return if buffer.null?
@stats_released.add(1)
@mutex.synchronize do
if @buffers.size < @max_pool_size
@buffers.push(buffer)
end
end
end

# Number of buffers currently available in the pool
def available : Int32
@mutex.synchronize { @buffers.size }
end

# Statistics about buffer usage
def stats
{
available: available,
allocated: @stats_allocated.get,
reused: @stats_reused.get,
released: @stats_released.get,
buffer_size: @buffer_size,
}
end

# Singleton pool instance
class_property pool : IO::BufferPool?

def self.setup(buffer_size : Int32, max_pool_size : Int32 = 100_000) : Nil
@@pool = IO::BufferPool.new(buffer_size, max_pool_size)
end
end

# Override IO::Buffered to support buffer pooling for socket connections.
# This saves memory for idle connections by releasing buffers back to a pool
# when operations complete, allowing them to be reused by active connections.
#
# The key insight is that most connections are idle most of the time, waiting
# for data. By releasing buffers when:
# - Read buffer: all buffered data has been consumed
# - Write buffer: data has been flushed
#
# We can dramatically reduce memory usage for scenarios with many connections.
module IO::Buffered
def self.buffer_pool_stats
IO::BufferPool.pool.try &.stats
end

# Override fill_buffer to only acquire buffer after data is available.
# For sockets, we release any existing buffer and wait for readability first,
# then acquire the buffer and perform the actual read.
# This ensures idle connections don't hold read buffers.
private def fill_buffer
pool = IO::BufferPool.pool
use_pool = pool && @buffer_size == pool.buffer_size

# For sockets: release buffer and wait for data before acquiring
# This keeps idle connections from holding buffers
if use_pool && self.is_a?(Socket)
# Release existing buffer before waiting
if (p = pool) && (in_buf = @in_buffer) && !in_buf.null?
p.release(in_buf)
@in_buffer = Pointer(UInt8).null
end
Crystal::EventLoop.current.wait_readable(self)
end

in_buf = if use_pool && (p = pool)
@in_buffer ||= p.acquire
else
@in_buffer ||= GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*)
end
size = unbuffered_read(Slice.new(in_buf, @buffer_size)).to_i
@in_buffer_rem = Slice.new(in_buf, size)
end

# Override read to release buffer back to pool when all data consumed
def read(slice : Bytes) : Int32
check_open

count = slice.size
return 0 if count == 0

if @in_buffer_rem.empty?
# If we are asked to read more than half the buffer's size,
# read directly into the slice, as it's not worth the extra
# memory copy.
if !read_buffering? || count >= @buffer_size // 2
return unbuffered_read(slice[0, count]).to_i
else
fill_buffer
return 0 if @in_buffer_rem.empty?
end
end

to_read = Math.min(count, @in_buffer_rem.size)
slice.copy_from(@in_buffer_rem.to_unsafe, to_read)
@in_buffer_rem += to_read

# Release buffer back to pool when all data consumed
if @in_buffer_rem.empty?
pool = IO::BufferPool.pool
if pool && @buffer_size == pool.buffer_size
if in_buf = @in_buffer
pool.release(in_buf)
@in_buffer = Pointer(UInt8).null
end
end
end

to_read
end

# Override flush to release write buffer after flushing
def flush : self
if @out_count > 0
unbuffered_write(Slice.new(out_buffer, @out_count))
end
unbuffered_flush
@out_count = 0

# Release write buffer back to pool after flush
pool = IO::BufferPool.pool
if pool && @buffer_size == pool.buffer_size
if out_buf = @out_buffer
pool.release(out_buf)
@out_buffer = Pointer(UInt8).null
end
end

self
end

# Override out_buffer to acquire from pool
private def out_buffer
pool = IO::BufferPool.pool
@out_buffer ||= if pool && @buffer_size == pool.buffer_size
pool.acquire
else
GC.malloc_atomic(@buffer_size.to_u32).as(UInt8*)
end
end

# Override close to release buffers back to pool
def close : Nil
# Release read buffer before close (write buffer released via flush)
pool = IO::BufferPool.pool
if pool && @buffer_size == pool.buffer_size
if in_buf = @in_buffer
pool.release(in_buf)
@in_buffer = Pointer(UInt8).null
@in_buffer_rem = Bytes.empty
end
end

flush if @out_count > 0
ensure
unbuffered_close
end
end
Loading