From 24282e32525f5237d5a16cedc13bb66e4182b23e Mon Sep 17 00:00:00 2001 From: syeopite Date: Thu, 22 May 2025 23:20:22 -0700 Subject: [PATCH 01/21] Add properties for max idle and lifetime in pool --- src/db/pool.cr | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 7eda7c76..0c22cba5 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -16,7 +16,11 @@ module DB # maximum amount of retry attempts to reconnect to the db. See `Pool#retry` retry_attempts : Int32 = 1, # seconds to wait before a retry attempt - retry_delay : Float64 = 0.2 do + retry_delay : Float64 = 0.2, + # maximum number of seconds the resource can persist after being created. 0 to disable. + max_lifetime_per_resource : Float64 = 0.0, + # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. + max_idle_time_per_resource : Float64 = 0.0 do def self.from_http_params(params : HTTP::Params, default = Options.new) Options.new( initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i, @@ -44,6 +48,11 @@ module DB # seconds to wait before a retry attempt @retry_delay : Float64 + # maximum number of seconds the resource can persist after being created. 0 to disable. + @max_lifetime_per_resource : Float64 + # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. + @max_idle_time_per_resource : Float64 + # Pool state # total of open connections managed by this pool @@ -78,6 +87,8 @@ module DB @checkout_timeout = pool_options.checkout_timeout @retry_attempts = pool_options.retry_attempts @retry_delay = pool_options.retry_delay + @max_lifetime_per_resource = pool_options.max_lifetime_per_resource + @max_idle_time_per_resource = pool_options.max_idle_time_per_resource @availability_channel = Channel(Nil).new @inflight = 0 From c7dda5e0aa0877ebd79df597724f1cb6a5724369 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 00:18:03 -0700 Subject: [PATCH 02/21] Convert max life and idle time to Time::Span --- src/db/pool.cr | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 0c22cba5..5821f25b 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -18,9 +18,9 @@ module DB # seconds to wait before a retry attempt retry_delay : Float64 = 0.2, # maximum number of seconds the resource can persist after being created. 0 to disable. - max_lifetime_per_resource : Float64 = 0.0, + max_lifetime_per_resource : Float64 | Time::Span = 0.0, # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. - max_idle_time_per_resource : Float64 = 0.0 do + max_idle_time_per_resource : Float64 | Time::Span = 0.0 do def self.from_http_params(params : HTTP::Params, default = Options.new) Options.new( initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i, @@ -29,6 +29,8 @@ module DB checkout_timeout: params.fetch("checkout_timeout", default.checkout_timeout).to_f, retry_attempts: params.fetch("retry_attempts", default.retry_attempts).to_i, retry_delay: params.fetch("retry_delay", default.retry_delay).to_f, + max_lifetime_per_resource: params.fetch("max_lifetime_per_resource", default.max_lifetime_per_resource).to_f, + max_idle_time_per_resource: params.fetch("max_idle_time_per_resource", default.max_idle_time_per_resource).to_f, ) end end @@ -49,9 +51,9 @@ module DB @retry_delay : Float64 # maximum number of seconds the resource can persist after being created. 0 to disable. - @max_lifetime_per_resource : Float64 + @max_lifetime_per_resource : Time::Span # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. - @max_idle_time_per_resource : Float64 + @max_idle_time_per_resource : Time::Span # Pool state @@ -87,16 +89,25 @@ module DB @checkout_timeout = pool_options.checkout_timeout @retry_attempts = pool_options.retry_attempts @retry_delay = pool_options.retry_delay - @max_lifetime_per_resource = pool_options.max_lifetime_per_resource - @max_idle_time_per_resource = pool_options.max_idle_time_per_resource @availability_channel = Channel(Nil).new @inflight = 0 @mutex = Mutex.new + @max_lifetime_per_resource = ensure_time_span(pool_options.max_lifetime_per_resource).as(Time::Span) + @max_idle_time_per_resource = ensure_time_span(pool_options.max_idle_time_per_resource).as(Time::Span) + @initial_pool_size.times { build_resource } end + private macro ensure_time_span(value) + if {{value}}.is_a? Number + {{value}}.seconds + else + {{value}} + end + end + # close all resources in the pool def close : Nil @total.each &.close From 6e4503c9e42d79cb061317cca3af25dfc3707a03 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 03:54:27 -0700 Subject: [PATCH 03/21] Add expiration checks during checkout and release --- spec/pool_spec.cr | 58 +++++++++++++++++++++++++++++ src/db/error.cr | 12 ++++++ src/db/pool.cr | 93 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 160 insertions(+), 3 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 841fe690..1c38ad6b 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../src/db/error.cr" class ShouldSleepingOp @is_sleeping = false @@ -224,4 +225,61 @@ describe DB::Pool do all.size.should eq 4 end + + it "should expire resources that exceed maximum lifetime on checkout" do + all = [] of Closable + pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 0.5) { Closable.new.tap { |c| all << c } } + + # After 0.5 seconds we should expect to get an expired resource + sleep 0.5.seconds + + ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do + pool.checkout + end + + # Lifetime expiration error should cause the client to be closed + all[0].closed?.should be_true + end + + it "should expire resources that exceed maximum idle-time on checkout" do + all = [] of Closable + pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_idle_time_per_resource: 0.5) { Closable.new.tap { |c| all << c } } + + # After two seconds we should expect to get an expired resource + sleep 0.5.seconds + + # Idle expiration error should cause the client to be closed + ex = expect_raises DB::PoolResourceIdleExpired(Closable) do + pool.checkout + end + + all[0].closed?.should be_true + end + + it "should only check lifetime expiration on release" do + all = [] of Closable + pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 2.0, max_idle_time_per_resource: 0.5) { Closable.new.tap { |c| all << c } } + + # Pass + pool.checkout do |client| + sleep 0.5.seconds + end + + # Not closed? + all[0].closed?.should be_false + + # We should expect to see an idle connection timeout now with the #checkout + ex = expect_raises DB::PoolResourceIdleExpired(Closable) do + pool.checkout + end + + all[0].closed?.should be_true + + # This should now create a new client that will be expired on release + ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do + pool.checkout { sleep 2.seconds } + end + + all[1].closed?.should be_true + end end diff --git a/src/db/error.cr b/src/db/error.cr index 4268bf2c..8958b7d2 100644 --- a/src/db/error.cr +++ b/src/db/error.cr @@ -39,6 +39,18 @@ module DB class PoolResourceRefused < Error end + # Raised when a checked out resource has reached expiration of any kind + class PoolResourceExpired(T) < PoolResourceLost(T) + end + + # Raised when a checked out resource has exceeded the maximum lifetime + class PoolResourceLifetimeExpired(T) < PoolResourceExpired(T) + end + + # Raised when a checked out resource has idle expired + class PoolResourceIdleExpired(T) < PoolResourceExpired(T) + end + # Raised when an established connection is lost # probably due to socket/network issues. # It is used by the connection pool retry logic. diff --git a/src/db/pool.cr b/src/db/pool.cr index 5821f25b..8a0045db 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -18,7 +18,7 @@ module DB # seconds to wait before a retry attempt retry_delay : Float64 = 0.2, # maximum number of seconds the resource can persist after being created. 0 to disable. - max_lifetime_per_resource : Float64 | Time::Span = 0.0, + max_lifetime_per_resource : Float64 | Time::Span = 0.0, # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. max_idle_time_per_resource : Float64 | Time::Span = 0.0 do def self.from_http_params(params : HTTP::Params, default = Options.new) @@ -64,6 +64,26 @@ module DB # connections waiting to be stablished (they are not in *@idle* nor in *@total*) @inflight : Int32 + # Tracks creation and last (checked out) used timestamps of a specific resource + private struct ResourceTimeEntry + # Time of creation + getter creation : Time = Time.utc + # Time the resource was last checked out + getter last_checked_out : Time + + def initialize + @last_checked_out = @creation + end + + # Sets the last checked out time to now + def got_checked_out + @last_checked_out = Time.utc + end + end + + # Maps a resource to a corresponding `ResourceTimeEntry` + @resource_lifecycle = {} of T => ResourceTimeEntry + # Sync state # communicate that a connection is available for checkout @@ -159,6 +179,9 @@ module DB end end + # A newly checked out client could potentially also have been idled for too long. + remove_expired!(resource, check_idle: true) + @idle.delete resource resource @@ -185,8 +208,14 @@ module DB sync do if resource.responds_to?(:closed?) && resource.closed? - @total.delete(resource) + delete(resource) elsif can_increase_idle_pool + # We only check lifetime expiration since this client has just been used + # and can no longer be considered a stale client even if it passed its idle + # expiration post-checkout. + expire_info = remove_expired!(resource) + expire_info.got_checked_out + @idle << resource if resource.responds_to?(:after_release) resource.after_release @@ -194,7 +223,7 @@ module DB idle_pushed = true else resource.close - @total.delete(resource) + delete(resource) end end @@ -242,6 +271,7 @@ module DB def each_resource(&) sync do @idle.each do |resource| + @resource_lifecycle[resource].got_checked_out yield resource end end @@ -256,6 +286,62 @@ module DB def delete(resource : T) @total.delete(resource) @idle.delete(resource) + @resource_lifecycle.delete(resource) + end + + # Checks if a resource has exceeded the maximum lifetime + # + # :nodoc: + def lifetime_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + return false if @max_lifetime_per_resource.zero? + (time - time_entry.creation) >= @max_lifetime_per_resource + end + + # Checks if a resource has exceeded the maximum idle time + # + # :nodoc: + def idle_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + return false if @max_idle_time_per_resource.zero? + (time - time_entry.last_checked_out) >= @max_idle_time_per_resource + end + + # Checks if the resource is expired, deletes if so. Otherwise returns the lifecycle information + def remove_expired?(resource : T, check_idle : Bool = false) + now = Time.utc + expire_info = @resource_lifecycle[resource] + + # For most situations only lifetime expiration needs to be checked. + # Idle timer is then shortly bumped if the resource hasn't expired yet. + if lifetime_expired?(expire_info, now) || ( + check_idle && idle_expired?(expire_info, now) + ) + resource.close + delete(resource) + end + + return expire_info + end + + # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so. Otherwise returns the lifecycle information + def remove_expired!(resource : T, check_idle : Bool = false) + now = Time.utc + expire_info = @resource_lifecycle[resource] + + expiration_type = if lifetime_expired?(expire_info, now) + PoolResourceLifetimeExpired + elsif check_idle && idle_expired?(expire_info, now) + PoolResourceIdleExpired + else + nil + end + + if expiration_type + resource.close + delete(resource) + raise expiration_type.new(resource) + end + + return expire_info end private def build_resource : T @@ -263,6 +349,7 @@ module DB sync do @total << resource @idle << resource + @resource_lifecycle[resource] = ResourceTimeEntry.new end resource end From 9f59d0620ddda7a351b3dbacce53ba0b2dd05fef Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 06:03:59 -0700 Subject: [PATCH 04/21] Add background job to clear expired resources --- spec/pool_spec.cr | 64 +++++++++++++++++++++++++++++++++- src/db/pool.cr | 87 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 132 insertions(+), 19 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 1c38ad6b..6506c538 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -258,7 +258,13 @@ describe DB::Pool do it "should only check lifetime expiration on release" do all = [] of Closable - pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 2.0, max_idle_time_per_resource: 0.5) { Closable.new.tap { |c| all << c } } + pool = create_pool( + max_pool_size: 2, + max_idle_pool_size: 1, + max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 0.5, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } # Pass pool.checkout do |client| @@ -282,4 +288,60 @@ describe DB::Pool do all[1].closed?.should be_true end + + describe "background expired resource sweeper " do + it "should clear idle resources" do + all = [] of Closable + pool = create_pool( + initial_pool_size: 0, + max_pool_size: 5, + max_idle_pool_size: 5, + max_idle_time_per_resource: 2.0, + ) { Closable.new.tap { |c| all << c } } + + wait_checkout = WaitFor.new + + 5.times { + spawn do + pool.checkout { sleep 0.5 } + wait_checkout.check + end + } + + 5.times do + wait_checkout.wait + end + + sleep 3 + + all.each &.closed?.should be_true + pool.stats.open_connections.should eq(0) + pool.stats.idle_connections.should eq(0) + end + + it "should ensure minimum of initial_pool_size fresh resources" do + all = [] of Closable + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 0.5, + ) { Closable.new.tap { |c| all << c } } + + # Since `resource_sweeper_timer` we should expect to see a sweep every 0.5 seconds (idle is lowest expiration) + # + # The first run occurs after 0.5 seconds which mean that the initial 3 resources should've gotten sweeped. + # Then three more resources should be created as to ensure that the amount of young resources within the pool + # never goes below initial_pool_size. We should be left with 6 clients created in total. + # + # A whole second to ensure the sweep fiber's completion. + sleep 1.seconds + + all.size.should eq(6) + all[..2].each &.closed?.should be_true + all[3..].each &.closed?.should be_false + pool.stats.idle_connections.should eq(3) + end + end end diff --git a/src/db/pool.cr b/src/db/pool.cr index 8a0045db..cce46320 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -20,8 +20,17 @@ module DB # maximum number of seconds the resource can persist after being created. 0 to disable. max_lifetime_per_resource : Float64 | Time::Span = 0.0, # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. - max_idle_time_per_resource : Float64 | Time::Span = 0.0 do + max_idle_time_per_resource : Float64 | Time::Span = 0.0, + # whether to enable a background sweeper to remove expired clients. Default is true but it will only be spawned if an expiration is actually set + expired_resource_sweeper : Bool = true, + # number of seconds to wait between each run of the expired resource sweeper. When unset (0) this value defaults to the shortest expiration duration + resource_sweeper_timer : Float64 | Int32 = 0 do def self.from_http_params(params : HTTP::Params, default = Options.new) + enabled_sweeper = params.fetch("expired_resource_sweeper", default.expired_resource_sweeper) + if enabled_sweeper.is_a?(String) + enabled_sweeper = {'1', "true", 't', "yes"}.includes?(enabled_sweeper.downcase) + end + Options.new( initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i, max_pool_size: params.fetch("max_pool_size", default.max_pool_size).to_i, @@ -31,6 +40,8 @@ module DB retry_delay: params.fetch("retry_delay", default.retry_delay).to_f, max_lifetime_per_resource: params.fetch("max_lifetime_per_resource", default.max_lifetime_per_resource).to_f, max_idle_time_per_resource: params.fetch("max_idle_time_per_resource", default.max_idle_time_per_resource).to_f, + expired_resource_sweeper: enabled_sweeper, + resource_sweeper_timer: params.fetch("resource_sweeper_timer", default.resource_sweeper_timer).to_f, ) end end @@ -91,6 +102,13 @@ module DB # global pool mutex @mutex : Mutex + # Sweep expired resource job + + # whether the job is enabled or disabled + @sweep_job_enabled : Bool + # cancels the sweep job as needed + @sweep_job_close_channel : Channel(Nil) + @[Deprecated("Use `#new` with DB::Pool::Options instead")] def initialize(initial_pool_size = 1, max_pool_size = 0, max_idle_pool_size = 1, checkout_timeout = 5.0, retry_attempts = 1, retry_delay = 0.2, &factory : -> T) @@ -118,6 +136,21 @@ module DB @max_idle_time_per_resource = ensure_time_span(pool_options.max_idle_time_per_resource).as(Time::Span) @initial_pool_size.times { build_resource } + + @sweep_job_enabled = pool_options.expired_resource_sweeper + + # Cancels the sweep job as needed + @sweep_job_close_channel = Channel(Nil).new + + if @sweep_job_enabled && !(min_expire = {@max_idle_time_per_resource, @max_lifetime_per_resource}.reject(&.zero?).min?).nil? + sweep_timer = ensure_time_span(pool_options.resource_sweeper_timer).as(Time::Span) + + if sweep_timer.zero? + sweep_timer = min_expire || sweep_timer + end + + sweep_expired_job(sweep_timer) if sweep_timer.positive? + end end private macro ensure_time_span(value) @@ -305,23 +338,6 @@ module DB (time - time_entry.last_checked_out) >= @max_idle_time_per_resource end - # Checks if the resource is expired, deletes if so. Otherwise returns the lifecycle information - def remove_expired?(resource : T, check_idle : Bool = false) - now = Time.utc - expire_info = @resource_lifecycle[resource] - - # For most situations only lifetime expiration needs to be checked. - # Idle timer is then shortly bumped if the resource hasn't expired yet. - if lifetime_expired?(expire_info, now) || ( - check_idle && idle_expired?(expire_info, now) - ) - resource.close - delete(resource) - end - - return expire_info - end - # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so. Otherwise returns the lifecycle information def remove_expired!(resource : T, check_idle : Bool = false) now = Time.utc @@ -391,5 +407,40 @@ module DB @mutex.lock end end + + private def sweep_expired_job(timer : Time::Span) + spawn do + loop do + select + when @sweep_job_close_channel.receive then break + when timeout(timer) + end + + sync do + now = Time.utc + + # Although not guaranteed, the first elements of @idle + # should be the oldest + @idle.each do |resource| + expire_info = @resource_lifecycle[resource] + + if lifetime_expired?(expire_info, now) || idle_expired?(expire_info, now) + resource.close + delete(resource) + end + end + + ensure_minimum_fresh_resources + end + end + end + end + + # Ensure there are at least a minimum of @initial_pool_size non-expired resources + # + # Should be called after each expiration batch + private def ensure_minimum_fresh_resources + unsync { (@initial_pool_size - @idle.size).clamp(0, @initial_pool_size).times { build_resource } } + end end end From 8747b2ef745b3b63b54b2d281aad606ce0af6954 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 07:13:28 -0700 Subject: [PATCH 05/21] Properly update last checked out time for resource Structs are passed by value meaning that the last_checked_out recorded never actually gets updated --- spec/pool_spec.cr | 47 ++++++++++++++++++++++++++++++++++++----------- src/db/pool.cr | 24 +++++++++++------------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 6506c538..72370cea 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -266,7 +266,7 @@ describe DB::Pool do expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } - # Pass + # Idle gets reset; not expired pool.checkout do |client| sleep 0.5.seconds end @@ -274,7 +274,10 @@ describe DB::Pool do # Not closed? all[0].closed?.should be_false - # We should expect to see an idle connection timeout now with the #checkout + # We should expect to see an idle connection timeout now with the #checkout after + # waiting another 0.5 seconds + + sleep 0.6.seconds ex = expect_raises DB::PoolResourceIdleExpired(Closable) do pool.checkout end @@ -289,6 +292,32 @@ describe DB::Pool do all[1].closed?.should be_true end + it "should reset idle-time on checkout" do + all = [] of Closable + pool = create_pool( + max_pool_size: 2, + max_idle_pool_size: 1, + max_idle_time_per_resource: 1.0, + max_lifetime_per_resource: 2.0, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + # Resource gets idled every second. It should get reset every time we checkout and release the resource. + # If we can last more than 2 seconds from the time of creation then it should get expired + # by the lifetime expiration instead. + + # Idle expiration error should cause the resource to be closed + ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do + 2.times { + pool.checkout { + sleep 1 + } + } + end + + all[0].closed?.should be_true + end + describe "background expired resource sweeper " do it "should clear idle resources" do all = [] of Closable @@ -299,21 +328,17 @@ describe DB::Pool do max_idle_time_per_resource: 2.0, ) { Closable.new.tap { |c| all << c } } - wait_checkout = WaitFor.new - + # Create 5 resource 5.times { spawn do - pool.checkout { sleep 0.5 } - wait_checkout.check + pool.checkout { sleep 0.1 } end } - 5.times do - wait_checkout.wait - end - - sleep 3 + # Don't do anything for 5 seconds + sleep 5 + # Gone all.each &.closed?.should be_true pool.stats.open_connections.should eq(0) pool.stats.idle_connections.should eq(0) diff --git a/src/db/pool.cr b/src/db/pool.cr index cce46320..e7e5c04f 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -76,7 +76,7 @@ module DB @inflight : Int32 # Tracks creation and last (checked out) used timestamps of a specific resource - private struct ResourceTimeEntry + private class ResourceTimeEntry # Time of creation getter creation : Time = Time.utc # Time the resource was last checked out @@ -212,8 +212,8 @@ module DB end end - # A newly checked out client could potentially also have been idled for too long. - remove_expired!(resource, check_idle: true) + # Remove client if expired (either idle or lifetime) + remove_expired!(resource) @idle.delete resource @@ -243,11 +243,9 @@ module DB if resource.responds_to?(:closed?) && resource.closed? delete(resource) elsif can_increase_idle_pool - # We only check lifetime expiration since this client has just been used - # and can no longer be considered a stale client even if it passed its idle - # expiration post-checkout. - expire_info = remove_expired!(resource) - expire_info.got_checked_out + # Checks (lifetime) expiration and updates last checked out time + # This will skip checking idle expiration time since it'll get updated + expire_info = remove_expired!(resource, update_last_checked_out: true) @idle << resource if resource.responds_to?(:after_release) @@ -338,26 +336,26 @@ module DB (time - time_entry.last_checked_out) >= @max_idle_time_per_resource end - # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so. Otherwise returns the lifecycle information - def remove_expired!(resource : T, check_idle : Bool = false) + # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so + def remove_expired!(resource : T, update_last_checked_out : Bool = false) now = Time.utc expire_info = @resource_lifecycle[resource] expiration_type = if lifetime_expired?(expire_info, now) PoolResourceLifetimeExpired - elsif check_idle && idle_expired?(expire_info, now) + elsif !update_last_checked_out && idle_expired?(expire_info, now) PoolResourceIdleExpired else nil end + expire_info.got_checked_out if update_last_checked_out + if expiration_type resource.close delete(resource) raise expiration_type.new(resource) end - - return expire_info end private def build_resource : T From 2c06a988fb04e0299a07a236460c9aa046684cc8 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 07:37:22 -0700 Subject: [PATCH 06/21] Add expiration statistics to pool.stats --- src/db/pool.cr | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index e7e5c04f..6cdcb828 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -75,6 +75,9 @@ module DB # connections waiting to be stablished (they are not in *@idle* nor in *@total*) @inflight : Int32 + @idle_expired_count : Int64 = 0 + @lifetime_expired_count : Int64 = 0 + # Tracks creation and last (checked out) used timestamps of a specific resource private class ResourceTimeEntry # Time of creation @@ -172,7 +175,9 @@ module DB open_connections : Int32, idle_connections : Int32, in_flight_connections : Int32, - max_connections : Int32 + max_connections : Int32, + idle_expired_connections : Int64, + lifetime_expired_connections : Int64 # Returns stats of the pool def stats @@ -181,6 +186,8 @@ module DB idle_connections: @idle.size, in_flight_connections: @inflight, max_connections: @max_pool_size, + idle_expired_connections: @idle_expired_count, + lifetime_expired_connections: @lifetime_expired_count ) end @@ -325,7 +332,10 @@ module DB # :nodoc: def lifetime_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) return false if @max_lifetime_per_resource.zero? - (time - time_entry.creation) >= @max_lifetime_per_resource + + expired = (time - time_entry.creation) >= @max_lifetime_per_resource + @lifetime_expired_count += 1 if expired + return expired end # Checks if a resource has exceeded the maximum idle time @@ -333,7 +343,10 @@ module DB # :nodoc: def idle_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) return false if @max_idle_time_per_resource.zero? - (time - time_entry.last_checked_out) >= @max_idle_time_per_resource + + expired = (time - time_entry.last_checked_out) >= @max_idle_time_per_resource + @idle_expired_count += 1 if expired + return expired end # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so From 785d160e7b38368afeacec2ff7cfc392505670c5 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 08:18:42 -0700 Subject: [PATCH 07/21] Ensure minimum initial_pool_size non-expired res on checkout/release --- spec/manual/pool_concurrent_expire_test.cr | 0 spec/pool_spec.cr | 69 ++++++++++++++++++++++ src/db/pool.cr | 6 +- 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 spec/manual/pool_concurrent_expire_test.cr diff --git a/spec/manual/pool_concurrent_expire_test.cr b/spec/manual/pool_concurrent_expire_test.cr new file mode 100644 index 00000000..e69de29b diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 72370cea..ccc39d36 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -318,6 +318,75 @@ describe DB::Pool do all[0].closed?.should be_true end + it "Should ensure minimum of initial_pool_size non-expired idle resources on checkout" do + all = [] of Closable + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 0.5, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + # Initially have 3 clients + all.size.should eq(3) + + sleep 0.6 + + # checkout + 3.times do |i| + # Each call should cause another resource to be spawned and the old one expired off + # for a minimum of three + ex = expect_raises DB::PoolResourceIdleExpired(Closable) do + pool.checkout { } + end + + pool.stats.idle_connections.should eq(3) + pool.stats.idle_expired_connections.should eq(i + 1) + all.size.should eq(3 + (i + 1)) + all[i].closed?.should be_true + end + end + + it "Should ensure minimum of initial_pool_size non-expired idle resources on release" do + all = [] of Closable + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 1.0, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + temp_resource_store = { + pool.checkout, + pool.checkout, + pool.checkout, + } + + # Await lifetime expiration + sleep 2.1 + # release + temp_resource_store.each_with_index do |resource, i| + # All three idle connections were checked out + # Each iteration should result in a new idle connection being created + # as the one we release get expired. + expect_raises DB::PoolResourceLifetimeExpired(Closable) do + pool.release(resource) + end + + pool.stats.idle_connections.should eq(i + 1) + + pool.stats.lifetime_expired_connections.should eq(i + 1) + all.size.should eq(3 + (i + 1)) + all[i].closed?.should be_true + end + end + describe "background expired resource sweeper " do it "should clear idle resources" do all = [] of Closable diff --git a/src/db/pool.cr b/src/db/pool.cr index 6cdcb828..4f87ae75 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -350,6 +350,8 @@ module DB end # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so + # + # :nodoc: def remove_expired!(resource : T, update_last_checked_out : Bool = false) now = Time.utc expire_info = @resource_lifecycle[resource] @@ -367,6 +369,7 @@ module DB if expiration_type resource.close delete(resource) + ensure_minimum_fresh_resources raise expiration_type.new(resource) end end @@ -451,7 +454,8 @@ module DB # # Should be called after each expiration batch private def ensure_minimum_fresh_resources - unsync { (@initial_pool_size - @idle.size).clamp(0, @initial_pool_size).times { build_resource } } + replenish = (@initial_pool_size - @total.size).clamp(0, @initial_pool_size) + unsync { replenish.times { build_resource } } if replenish > 0 end end end From 7a70cec8399833aeb161c8a08f817ab86956a490 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 08:35:52 -0700 Subject: [PATCH 08/21] End sweep expired resources fiber as needed --- src/db/pool.cr | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 4f87ae75..7e468f7e 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -109,6 +109,10 @@ module DB # whether the job is enabled or disabled @sweep_job_enabled : Bool + # has a sweep job running + @sweep_job_running : Bool + # timer between each run + @sweep_timer : Time::Span? # cancels the sweep job as needed @sweep_job_close_channel : Channel(Nil) @@ -145,14 +149,16 @@ module DB # Cancels the sweep job as needed @sweep_job_close_channel = Channel(Nil).new + @sweep_job_running = false + if @sweep_job_enabled && !(min_expire = {@max_idle_time_per_resource, @max_lifetime_per_resource}.reject(&.zero?).min?).nil? sweep_timer = ensure_time_span(pool_options.resource_sweeper_timer).as(Time::Span) if sweep_timer.zero? - sweep_timer = min_expire || sweep_timer + @sweep_timer = min_expire || sweep_timer end - sweep_expired_job(sweep_timer) if sweep_timer.positive? + sweep_expired_job end end @@ -169,6 +175,7 @@ module DB @total.each &.close @total.clear @idle.clear + @sweep_job_close_channel.send(nil) if @sweep_job_running end record Stats, @@ -196,6 +203,7 @@ module DB resource = nil until resource + sweep_expired_job if !@sweep_job_running resource = if @idle.empty? if can_increase_pool? @inflight += 1 @@ -422,11 +430,18 @@ module DB end end - private def sweep_expired_job(timer : Time::Span) + private def sweep_expired_job + timer = @sweep_timer + return if timer.nil? || !timer.positive? + + @sweep_job_running = true + spawn do loop do select - when @sweep_job_close_channel.receive then break + when @sweep_job_close_channel.receive + @sweep_job_running = false + break when timeout(timer) end @@ -445,6 +460,9 @@ module DB end ensure_minimum_fresh_resources + + # End job if there is no initial pool size and the entire pool has been expired + break if !@initial_pool_size && @total.empty? end end end From f17f2affd6f6c1c4bf910878f7a1425699471ff9 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 08:42:48 -0700 Subject: [PATCH 09/21] Clear resource lifecycle lookup hash on pool close --- src/db/pool.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/db/pool.cr b/src/db/pool.cr index 7e468f7e..0cf3f040 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -175,6 +175,7 @@ module DB @total.each &.close @total.clear @idle.clear + @resource_lifecycle.clear @sweep_job_close_channel.send(nil) if @sweep_job_running end From 7abc5e43ded22ff14d7f30abf6cce013254e252c Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 09:17:46 -0700 Subject: [PATCH 10/21] Suppress expired resource errors on #release --- spec/pool_spec.cr | 38 +++++++++++++++++--------------------- src/db/pool.cr | 21 ++++++++++++++------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index ccc39d36..e0f4c636 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -267,7 +267,7 @@ describe DB::Pool do ) { Closable.new.tap { |c| all << c } } # Idle gets reset; not expired - pool.checkout do |client| + pool.checkout do |resource| sleep 0.5.seconds end @@ -285,9 +285,8 @@ describe DB::Pool do all[0].closed?.should be_true # This should now create a new client that will be expired on release - ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do - pool.checkout { sleep 2.seconds } - end + pool.checkout { sleep 2.seconds } + pool.stats.lifetime_expired_connections.should eq 1 all[1].closed?.should be_true end @@ -296,26 +295,26 @@ describe DB::Pool do all = [] of Closable pool = create_pool( max_pool_size: 2, + initial_pool_size: 0, max_idle_pool_size: 1, - max_idle_time_per_resource: 1.0, - max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 0.5, + max_lifetime_per_resource: 1.0, expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } - # Resource gets idled every second. It should get reset every time we checkout and release the resource. - # If we can last more than 2 seconds from the time of creation then it should get expired + # Resource gets idled every half second. It should get reset every time we checkout and release the resource. + # If we can last more than a second from the time of creation then it should get expired # by the lifetime expiration instead. - - # Idle expiration error should cause the resource to be closed - ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do - 2.times { - pool.checkout { - sleep 1 - } + 2.times { + pool.checkout { + sleep 0.6 } - end + } - all[0].closed?.should be_true + pool.stats.lifetime_expired_connections.should eq(1) + pool.stats.idle_expired_connections.should eq(0) + all.each &.closed?.should be_true + all.size.should eq(1) end it "Should ensure minimum of initial_pool_size non-expired idle resources on checkout" do @@ -375,12 +374,9 @@ describe DB::Pool do # All three idle connections were checked out # Each iteration should result in a new idle connection being created # as the one we release get expired. - expect_raises DB::PoolResourceLifetimeExpired(Closable) do - pool.release(resource) - end + pool.release(resource) pool.stats.idle_connections.should eq(i + 1) - pool.stats.lifetime_expired_connections.should eq(i + 1) all.size.should eq(3 + (i + 1)) all[i].closed?.should be_true diff --git a/src/db/pool.cr b/src/db/pool.cr index 0cf3f040..d9a23e24 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -259,9 +259,18 @@ module DB if resource.responds_to?(:closed?) && resource.closed? delete(resource) elsif can_increase_idle_pool - # Checks (lifetime) expiration and updates last checked out time - # This will skip checking idle expiration time since it'll get updated - expire_info = remove_expired!(resource, update_last_checked_out: true) + # Checks lifetime expiration and updates last checked out time + # Old idle expiration isn't checked because this replaces it. + expire_info = @resource_lifecycle[resource] + if lifetime_expired?(expire_info, Time.utc) + resource.close + delete(resource) + ensure_minimum_fresh_resources + + return nil + else + expire_info.got_checked_out + end @idle << resource if resource.responds_to?(:after_release) @@ -361,20 +370,18 @@ module DB # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so # # :nodoc: - def remove_expired!(resource : T, update_last_checked_out : Bool = false) + def remove_expired!(resource : T) now = Time.utc expire_info = @resource_lifecycle[resource] expiration_type = if lifetime_expired?(expire_info, now) PoolResourceLifetimeExpired - elsif !update_last_checked_out && idle_expired?(expire_info, now) + elsif idle_expired?(expire_info, now) PoolResourceIdleExpired else nil end - expire_info.got_checked_out if update_last_checked_out - if expiration_type resource.close delete(resource) From 3d173494753b35676c665b88cee6d9234c429911 Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 09:21:37 -0700 Subject: [PATCH 11/21] Fix deprecation warnings regarding sleep --- spec/pool_spec.cr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index e0f4c636..ebc5144c 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -307,7 +307,7 @@ describe DB::Pool do # by the lifetime expiration instead. 2.times { pool.checkout { - sleep 0.6 + sleep 0.6.seconds } } @@ -332,7 +332,7 @@ describe DB::Pool do # Initially have 3 clients all.size.should eq(3) - sleep 0.6 + sleep 0.6.seconds # checkout 3.times do |i| @@ -368,7 +368,7 @@ describe DB::Pool do } # Await lifetime expiration - sleep 2.1 + sleep 2.1.seconds # release temp_resource_store.each_with_index do |resource, i| # All three idle connections were checked out @@ -396,12 +396,12 @@ describe DB::Pool do # Create 5 resource 5.times { spawn do - pool.checkout { sleep 0.1 } + pool.checkout { sleep 0.1.seconds } end } # Don't do anything for 5 seconds - sleep 5 + sleep 5.seconds # Gone all.each &.closed?.should be_true From c93332590cbd9d136e91d4b84e9683da4c54964b Mon Sep 17 00:00:00 2001 From: syeopite Date: Fri, 23 May 2025 09:33:40 -0700 Subject: [PATCH 12/21] Remove empty file Oops --- spec/manual/pool_concurrent_expire_test.cr | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 spec/manual/pool_concurrent_expire_test.cr diff --git a/spec/manual/pool_concurrent_expire_test.cr b/spec/manual/pool_concurrent_expire_test.cr deleted file mode 100644 index e69de29b..00000000 From 58c6e45bb83214da699d9c3797da9391799ec220 Mon Sep 17 00:00:00 2001 From: syeopite Date: Mon, 26 May 2025 19:54:20 -0700 Subject: [PATCH 13/21] Fix race condition in pool replenishment The number of inflight resources was not properly incremented or compared against resulting in the possibility of fibers queuing up many unnecessary replenishments to the pool. --- spec/pool_spec.cr | 73 +++++++++++++++++++++++++++++++++++++++++++++++ src/db/pool.cr | 12 ++++++-- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index ebc5144c..47204597 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -58,6 +58,17 @@ class Closable end end +class ClosableWithSignal < Closable + setter close_signal : Channel(Nil)? + + def initialize(@close_signal) + end + + protected def do_close + @close_signal.try &.send(nil) + end +end + private def create_pool(**options, &factory : -> T) forall T DB::Pool.new(DB::Pool::Options.new(**options), &factory) end @@ -383,6 +394,68 @@ describe DB::Pool do end end + it "Should count inflight resources when ensuring minimum of initial_pool_size non-expired resources" do + number_of_factory_calls = 0 + toggle_long_inflight = false + + close_inflight = Channel(Nil).new + resource_closed_signal = Channel(Nil).new + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 2.0, + expired_resource_sweeper: false + ) do + number_of_factory_calls += 1 + if toggle_long_inflight + close_inflight.send(nil) + end + ClosableWithSignal.new(resource_closed_signal) + end + + toggle_long_inflight = true + temporary_latch = {pool.checkout, pool.checkout, pool.checkout} + spawn { pool.checkout { } } + + # Make existing resources stale + sleep 2.seconds + + pool.stats.idle_connections.should eq(0) + pool.stats.in_flight_connections.should eq(1) + + # Release latched resources + temporary_latch.each do |resource| + spawn do + pool.release(resource) + end + end + + # If inflight number is used correctly there should only be a total of + # three new pending resources created in total which is used to replace the + # expiring ones. + # + # +1 from the initial checkout (total 3, inflight: 1) + # +0 from the first release (total 2, inflight: 1) + # +1 from the second release (total: 1, inflight: 2) + # +1 from the third release (total: 0, inflight: 3) + + 3.times do + resource_closed_signal.receive + close_inflight.receive + end + + # Should close gracefully and without any errors. + close_inflight.close + + number_of_factory_calls.should eq(6) + pool.stats.idle_connections.should eq(3) + pool.stats.open_connections.should eq(3) + pool.stats.in_flight_connections.should eq(0) + pool.stats.lifetime_expired_connections.should eq(3) + end + describe "background expired resource sweeper " do it "should clear idle resources" do all = [] of Closable diff --git a/src/db/pool.cr b/src/db/pool.cr index d9a23e24..3251b6f7 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -480,8 +480,16 @@ module DB # # Should be called after each expiration batch private def ensure_minimum_fresh_resources - replenish = (@initial_pool_size - @total.size).clamp(0, @initial_pool_size) - unsync { replenish.times { build_resource } } if replenish > 0 + replenish = (@initial_pool_size - (@total.size + @inflight)).clamp(0, @initial_pool_size) + + return if replenish <= 0 + + begin + @inflight += replenish + unsync { replenish.times { build_resource } } + ensure + @inflight -= replenish + end end end end From cbbb814e9f9b5f8c2b2c22e90230f78d378ac087 Mon Sep 17 00:00:00 2001 From: syeopite Date: Mon, 26 May 2025 21:05:12 -0700 Subject: [PATCH 14/21] Rework some tests to reduce sleep duration --- spec/pool_spec.cr | 116 +++++++++++++++++++--------------------------- 1 file changed, 47 insertions(+), 69 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 47204597..bccf35c0 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -59,13 +59,13 @@ class Closable end class ClosableWithSignal < Closable - setter close_signal : Channel(Nil)? + setter signal : Channel(Nil)? - def initialize(@close_signal) + def initialize(@signal = nil) end protected def do_close - @close_signal.try &.send(nil) + @signal.try &.send(nil) end end @@ -239,11 +239,9 @@ describe DB::Pool do it "should expire resources that exceed maximum lifetime on checkout" do all = [] of Closable - pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 0.5) { Closable.new.tap { |c| all << c } } - - # After 0.5 seconds we should expect to get an expired resource - sleep 0.5.seconds + pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 0.1) { Closable.new.tap { |c| all << c } } + sleep 0.1.seconds ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do pool.checkout end @@ -254,10 +252,9 @@ describe DB::Pool do it "should expire resources that exceed maximum idle-time on checkout" do all = [] of Closable - pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_idle_time_per_resource: 0.5) { Closable.new.tap { |c| all << c } } + pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_idle_time_per_resource: 0.1, max_lifetime_per_resource: 2.0) { Closable.new.tap { |c| all << c } } - # After two seconds we should expect to get an expired resource - sleep 0.5.seconds + sleep 0.1.seconds # Idle expiration error should cause the client to be closed ex = expect_raises DB::PoolResourceIdleExpired(Closable) do @@ -267,64 +264,43 @@ describe DB::Pool do all[0].closed?.should be_true end - it "should only check lifetime expiration on release" do + it "should expire resources that exceed maximum lifetime on release" do all = [] of Closable pool = create_pool( max_pool_size: 2, max_idle_pool_size: 1, - max_lifetime_per_resource: 2.0, - max_idle_time_per_resource: 0.5, + max_lifetime_per_resource: 0.2, + max_idle_time_per_resource: 2.0, expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } - # Idle gets reset; not expired - pool.checkout do |resource| - sleep 0.5.seconds - end - - # Not closed? - all[0].closed?.should be_false - - # We should expect to see an idle connection timeout now with the #checkout after - # waiting another 0.5 seconds - - sleep 0.6.seconds - ex = expect_raises DB::PoolResourceIdleExpired(Closable) do - pool.checkout - end - - all[0].closed?.should be_true - - # This should now create a new client that will be expired on release - pool.checkout { sleep 2.seconds } + pool.checkout { sleep 0.25.seconds } pool.stats.lifetime_expired_connections.should eq 1 - - all[1].closed?.should be_true + all[0].closed?.should be_true end - it "should reset idle-time on checkout" do + it "should reset idle-time during release" do all = [] of Closable pool = create_pool( max_pool_size: 2, initial_pool_size: 0, max_idle_pool_size: 1, - max_idle_time_per_resource: 0.5, - max_lifetime_per_resource: 1.0, + max_idle_time_per_resource: 0.2, + max_lifetime_per_resource: 0.4, expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } - # Resource gets idled every half second. It should get reset every time we checkout and release the resource. - # If we can last more than a second from the time of creation then it should get expired - # by the lifetime expiration instead. - 2.times { - pool.checkout { - sleep 0.6.seconds - } - } + # Resource gets idled every 0.2 seconds but gets reset upon each release. + pool.checkout do + sleep 0.21.seconds + end + + all[0].closed?.should be_false + pool.checkout { sleep 0.2.seconds } pool.stats.lifetime_expired_connections.should eq(1) pool.stats.idle_expired_connections.should eq(0) - all.each &.closed?.should be_true + all[0].closed?.should be_true all.size.should eq(1) end @@ -335,15 +311,15 @@ describe DB::Pool do initial_pool_size: 3, max_pool_size: 5, max_idle_pool_size: 5, - max_lifetime_per_resource: 2.0, - max_idle_time_per_resource: 0.5, + max_lifetime_per_resource: 1.0, + max_idle_time_per_resource: 0.1, expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } # Initially have 3 clients all.size.should eq(3) - sleep 0.6.seconds + sleep 0.1.seconds # checkout 3.times do |i| @@ -367,8 +343,7 @@ describe DB::Pool do initial_pool_size: 3, max_pool_size: 5, max_idle_pool_size: 5, - max_lifetime_per_resource: 2.0, - max_idle_time_per_resource: 1.0, + max_lifetime_per_resource: 0.2, expired_resource_sweeper: false ) { Closable.new.tap { |c| all << c } } @@ -379,7 +354,7 @@ describe DB::Pool do } # Await lifetime expiration - sleep 2.1.seconds + sleep 0.2.seconds # release temp_resource_store.each_with_index do |resource, i| # All three idle connections were checked out @@ -405,7 +380,7 @@ describe DB::Pool do initial_pool_size: 3, max_pool_size: 5, max_idle_pool_size: 5, - max_lifetime_per_resource: 2.0, + max_lifetime_per_resource: 0.25, expired_resource_sweeper: false ) do number_of_factory_calls += 1 @@ -420,7 +395,7 @@ describe DB::Pool do spawn { pool.checkout { } } # Make existing resources stale - sleep 2.seconds + sleep 0.25.seconds pool.stats.idle_connections.should eq(0) pool.stats.in_flight_connections.should eq(1) @@ -459,12 +434,15 @@ describe DB::Pool do describe "background expired resource sweeper " do it "should clear idle resources" do all = [] of Closable + signal = Channel(Nil).new pool = create_pool( initial_pool_size: 0, max_pool_size: 5, max_idle_pool_size: 5, - max_idle_time_per_resource: 2.0, - ) { Closable.new.tap { |c| all << c } } + max_idle_time_per_resource: 0.25, + ) do + ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal + end # Create 5 resource 5.times { @@ -473,8 +451,11 @@ describe DB::Pool do end } - # Don't do anything for 5 seconds - sleep 5.seconds + all.each &.closed?.should be_false + + 5.times do + signal.receive + end # Gone all.each &.closed?.should be_true @@ -484,22 +465,19 @@ describe DB::Pool do it "should ensure minimum of initial_pool_size fresh resources" do all = [] of Closable + signal = Channel(Nil).new pool = create_pool( initial_pool_size: 3, max_pool_size: 5, max_idle_pool_size: 5, max_lifetime_per_resource: 2.0, max_idle_time_per_resource: 0.5, - ) { Closable.new.tap { |c| all << c } } - - # Since `resource_sweeper_timer` we should expect to see a sweep every 0.5 seconds (idle is lowest expiration) - # - # The first run occurs after 0.5 seconds which mean that the initial 3 resources should've gotten sweeped. - # Then three more resources should be created as to ensure that the amount of young resources within the pool - # never goes below initial_pool_size. We should be left with 6 clients created in total. - # - # A whole second to ensure the sweep fiber's completion. - sleep 1.seconds + ) { ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal } + + # The job will replace the three idle expired resources with new ones + 3.times { signal.receive } + # Wait for the replenishment process to finish + sleep 0.25.seconds all.size.should eq(6) all[..2].each &.closed?.should be_true From c27aa088f877cbd3f803f6bbe577d8f35961286a Mon Sep 17 00:00:00 2001 From: syeopite Date: Mon, 26 May 2025 21:13:56 -0700 Subject: [PATCH 15/21] Loosen types on pool expiration properties Allow integer arguments for max expiration time Allow using `Time::Span` for the resource sweeper timer --- src/db/pool.cr | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 3251b6f7..50a7b519 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -18,13 +18,13 @@ module DB # seconds to wait before a retry attempt retry_delay : Float64 = 0.2, # maximum number of seconds the resource can persist after being created. 0 to disable. - max_lifetime_per_resource : Float64 | Time::Span = 0.0, + max_lifetime_per_resource : Float64 | Int32 | Time::Span = 0.0, # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. - max_idle_time_per_resource : Float64 | Time::Span = 0.0, + max_idle_time_per_resource : Float64 | Int32 | Time::Span = 0.0, # whether to enable a background sweeper to remove expired clients. Default is true but it will only be spawned if an expiration is actually set expired_resource_sweeper : Bool = true, # number of seconds to wait between each run of the expired resource sweeper. When unset (0) this value defaults to the shortest expiration duration - resource_sweeper_timer : Float64 | Int32 = 0 do + resource_sweeper_timer : Float64 | Int32 | Time::Span = 0 do def self.from_http_params(params : HTTP::Params, default = Options.new) enabled_sweeper = params.fetch("expired_resource_sweeper", default.expired_resource_sweeper) if enabled_sweeper.is_a?(String) From cb068bebea5b7c0d92f17c0f1f283dbdd502c673 Mon Sep 17 00:00:00 2001 From: syeopite Date: Mon, 26 May 2025 21:18:36 -0700 Subject: [PATCH 16/21] Make pool resource expiration methods private --- src/db/pool.cr | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 50a7b519..bcb66316 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -348,7 +348,7 @@ module DB # Checks if a resource has exceeded the maximum lifetime # # :nodoc: - def lifetime_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + private def lifetime_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) return false if @max_lifetime_per_resource.zero? expired = (time - time_entry.creation) >= @max_lifetime_per_resource @@ -359,7 +359,7 @@ module DB # Checks if a resource has exceeded the maximum idle time # # :nodoc: - def idle_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + private def idle_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) return false if @max_idle_time_per_resource.zero? expired = (time - time_entry.last_checked_out) >= @max_idle_time_per_resource @@ -370,7 +370,7 @@ module DB # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so # # :nodoc: - def remove_expired!(resource : T) + private def remove_expired!(resource : T) now = Time.utc expire_info = @resource_lifecycle[resource] From 88bbf13015424d8402373afc1726ac1e4ed945af Mon Sep 17 00:00:00 2001 From: syeopite Date: Tue, 27 May 2025 05:03:30 -0700 Subject: [PATCH 17/21] Notify resource availability on replenishment --- src/db/pool.cr | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index bcb66316..40c71205 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -266,7 +266,6 @@ module DB resource.close delete(resource) ensure_minimum_fresh_resources - return nil else expire_info.got_checked_out @@ -283,12 +282,7 @@ module DB end end - if idle_pushed - select - when @availability_channel.send(nil) - else - end - end + notify_availability if idle_pushed end # :nodoc: @@ -345,6 +339,14 @@ module DB @resource_lifecycle.delete(resource) end + # Inform availability_channel about an available resource + private def notify_availability : Nil + select + when @availability_channel.send(nil) + else + end + end + # Checks if a resource has exceeded the maximum lifetime # # :nodoc: @@ -484,12 +486,19 @@ module DB return if replenish <= 0 - begin - @inflight += replenish - unsync { replenish.times { build_resource } } - ensure - @inflight -= replenish + replenish.times do |index| + begin + @inflight += 1 + unsync do + build_resource + notify_availability + end + ensure + @inflight -= 1 + end end + + return true end end end From 8864bab70f306eebf875bd9ec6eb943367c9102e Mon Sep 17 00:00:00 2001 From: syeopite Date: Tue, 27 May 2025 05:10:08 -0700 Subject: [PATCH 18/21] Fix Crystal 1.0.0 compatibility Replace call to Time::Span.positive? Bumps some tiny sleep duration to safeguard test succcess --- spec/pool_spec.cr | 6 +++--- src/db/pool.cr | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index bccf35c0..544af0be 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -241,7 +241,7 @@ describe DB::Pool do all = [] of Closable pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 0.1) { Closable.new.tap { |c| all << c } } - sleep 0.1.seconds + sleep 0.2.seconds ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do pool.checkout end @@ -254,7 +254,7 @@ describe DB::Pool do all = [] of Closable pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_idle_time_per_resource: 0.1, max_lifetime_per_resource: 2.0) { Closable.new.tap { |c| all << c } } - sleep 0.1.seconds + sleep 0.2.seconds # Idle expiration error should cause the client to be closed ex = expect_raises DB::PoolResourceIdleExpired(Closable) do @@ -319,7 +319,7 @@ describe DB::Pool do # Initially have 3 clients all.size.should eq(3) - sleep 0.1.seconds + sleep 0.2.seconds # checkout 3.times do |i| diff --git a/src/db/pool.cr b/src/db/pool.cr index 40c71205..ede02574 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -442,7 +442,7 @@ module DB private def sweep_expired_job timer = @sweep_timer - return if timer.nil? || !timer.positive? + return if timer.nil? || (timer <= Time::Span::ZERO) @sweep_job_running = true From e4f61afdb64a35bca9db9469c3bc07ce718d9861 Mon Sep 17 00:00:00 2001 From: syeopite Date: Tue, 27 May 2025 05:18:44 -0700 Subject: [PATCH 19/21] Fix specs --- spec/pool_spec.cr | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 544af0be..2c22e506 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -239,7 +239,10 @@ describe DB::Pool do it "should expire resources that exceed maximum lifetime on checkout" do all = [] of Closable - pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_lifetime_per_resource: 0.1) { Closable.new.tap { |c| all << c } } + pool = create_pool( + max_pool_size: 2, max_idle_pool_size: 1, + max_lifetime_per_resource: 0.1, expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } sleep 0.2.seconds ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do @@ -252,7 +255,11 @@ describe DB::Pool do it "should expire resources that exceed maximum idle-time on checkout" do all = [] of Closable - pool = create_pool(max_pool_size: 2, max_idle_pool_size: 1, max_idle_time_per_resource: 0.1, max_lifetime_per_resource: 2.0) { Closable.new.tap { |c| all << c } } + pool = create_pool( + max_pool_size: 2, max_idle_pool_size: 1, + max_idle_time_per_resource: 0.2, max_lifetime_per_resource: 2.0, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } sleep 0.2.seconds From 87d96d31bfce4bc44b03ac61e1d1b4f36c1806c2 Mon Sep 17 00:00:00 2001 From: syeopite Date: Sun, 1 Jun 2025 17:51:23 -0700 Subject: [PATCH 20/21] Add rescue block for expiration errors in #retry Although handled by `PoolResourceLost(T)` doing so manually avoids a mutex lock and a second redundant call to #delete --- src/db/pool.cr | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/db/pool.cr b/src/db/pool.cr index ede02574..5c9610d9 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -303,6 +303,12 @@ module DB begin sleep @retry_delay if i >= current_available return yield + rescue e : PoolResourceExpired(T) + # A `PoolResourceExpired` is raised internally at #checkout + # and is both closed and deleted from the pool at the time of + # raising. Although we can technically let the rescue of + # `PoolResourceLost(T)` handle the retry, we can avoid an expensive + # mutex lock by doing so manually. rescue e : PoolResourceLost(T) # if the connection is lost it will be closed by # the exception to release resources From c87332231dffdcf59ed21c53b10a35d5a977b94f Mon Sep 17 00:00:00 2001 From: syeopite Date: Sun, 1 Jun 2025 17:55:45 -0700 Subject: [PATCH 21/21] Formatting --- src/db/pool.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 5c9610d9..ad1daf34 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -306,7 +306,7 @@ module DB rescue e : PoolResourceExpired(T) # A `PoolResourceExpired` is raised internally at #checkout # and is both closed and deleted from the pool at the time of - # raising. Although we can technically let the rescue of + # raising. Although we can technically let the rescue of # `PoolResourceLost(T)` handle the retry, we can avoid an expensive # mutex lock by doing so manually. rescue e : PoolResourceLost(T)