diff --git a/spec/pool_spec.cr b/spec/pool_spec.cr index 841fe690..2c22e506 100644 --- a/spec/pool_spec.cr +++ b/spec/pool_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../src/db/error.cr" class ShouldSleepingOp @is_sleeping = false @@ -57,6 +58,17 @@ class Closable end end +class ClosableWithSignal < Closable + setter signal : Channel(Nil)? + + def initialize(@signal = nil) + end + + protected def do_close + @signal.try &.send(nil) + end +end + private def create_pool(**options, &factory : -> T) forall T DB::Pool.new(DB::Pool::Options.new(**options), &factory) end @@ -224,4 +236,260 @@ describe DB::Pool do all.size.should eq 4 end + + it "should expire resources that exceed maximum lifetime on checkout" do + all = [] of Closable + pool = create_pool( + max_pool_size: 2, max_idle_pool_size: 1, + max_lifetime_per_resource: 0.1, expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + sleep 0.2.seconds + ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do + pool.checkout + end + + # Lifetime expiration error should cause the client to be closed + all[0].closed?.should be_true + end + + it "should expire resources that exceed maximum idle-time on checkout" do + all = [] of Closable + pool = create_pool( + max_pool_size: 2, max_idle_pool_size: 1, + max_idle_time_per_resource: 0.2, max_lifetime_per_resource: 2.0, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + sleep 0.2.seconds + + # Idle expiration error should cause the client to be closed + ex = expect_raises DB::PoolResourceIdleExpired(Closable) do + pool.checkout + end + + all[0].closed?.should be_true + end + + it "should expire resources that exceed maximum lifetime on release" do + all = [] of Closable + pool = create_pool( + max_pool_size: 2, + max_idle_pool_size: 1, + max_lifetime_per_resource: 0.2, + max_idle_time_per_resource: 2.0, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + pool.checkout { sleep 0.25.seconds } + pool.stats.lifetime_expired_connections.should eq 1 + all[0].closed?.should be_true + end + + it "should reset idle-time during release" do + all = [] of Closable + pool = create_pool( + max_pool_size: 2, + initial_pool_size: 0, + max_idle_pool_size: 1, + max_idle_time_per_resource: 0.2, + max_lifetime_per_resource: 0.4, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + # Resource gets idled every 0.2 seconds but gets reset upon each release. + pool.checkout do + sleep 0.21.seconds + end + + all[0].closed?.should be_false + pool.checkout { sleep 0.2.seconds } + + pool.stats.lifetime_expired_connections.should eq(1) + pool.stats.idle_expired_connections.should eq(0) + all[0].closed?.should be_true + all.size.should eq(1) + end + + it "Should ensure minimum of initial_pool_size non-expired idle resources on checkout" do + all = [] of Closable + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 1.0, + max_idle_time_per_resource: 0.1, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + # Initially have 3 clients + all.size.should eq(3) + + sleep 0.2.seconds + + # checkout + 3.times do |i| + # Each call should cause another resource to be spawned and the old one expired off + # for a minimum of three + ex = expect_raises DB::PoolResourceIdleExpired(Closable) do + pool.checkout { } + end + + pool.stats.idle_connections.should eq(3) + pool.stats.idle_expired_connections.should eq(i + 1) + all.size.should eq(3 + (i + 1)) + all[i].closed?.should be_true + end + end + + it "Should ensure minimum of initial_pool_size non-expired idle resources on release" do + all = [] of Closable + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 0.2, + expired_resource_sweeper: false + ) { Closable.new.tap { |c| all << c } } + + temp_resource_store = { + pool.checkout, + pool.checkout, + pool.checkout, + } + + # Await lifetime expiration + sleep 0.2.seconds + # release + temp_resource_store.each_with_index do |resource, i| + # All three idle connections were checked out + # Each iteration should result in a new idle connection being created + # as the one we release get expired. + pool.release(resource) + + pool.stats.idle_connections.should eq(i + 1) + pool.stats.lifetime_expired_connections.should eq(i + 1) + all.size.should eq(3 + (i + 1)) + all[i].closed?.should be_true + end + end + + it "Should count inflight resources when ensuring minimum of initial_pool_size non-expired resources" do + number_of_factory_calls = 0 + toggle_long_inflight = false + + close_inflight = Channel(Nil).new + resource_closed_signal = Channel(Nil).new + + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 0.25, + expired_resource_sweeper: false + ) do + number_of_factory_calls += 1 + if toggle_long_inflight + close_inflight.send(nil) + end + ClosableWithSignal.new(resource_closed_signal) + end + + toggle_long_inflight = true + temporary_latch = {pool.checkout, pool.checkout, pool.checkout} + spawn { pool.checkout { } } + + # Make existing resources stale + sleep 0.25.seconds + + pool.stats.idle_connections.should eq(0) + pool.stats.in_flight_connections.should eq(1) + + # Release latched resources + temporary_latch.each do |resource| + spawn do + pool.release(resource) + end + end + + # If inflight number is used correctly there should only be a total of + # three new pending resources created in total which is used to replace the + # expiring ones. + # + # +1 from the initial checkout (total 3, inflight: 1) + # +0 from the first release (total 2, inflight: 1) + # +1 from the second release (total: 1, inflight: 2) + # +1 from the third release (total: 0, inflight: 3) + + 3.times do + resource_closed_signal.receive + close_inflight.receive + end + + # Should close gracefully and without any errors. + close_inflight.close + + number_of_factory_calls.should eq(6) + pool.stats.idle_connections.should eq(3) + pool.stats.open_connections.should eq(3) + pool.stats.in_flight_connections.should eq(0) + pool.stats.lifetime_expired_connections.should eq(3) + end + + describe "background expired resource sweeper " do + it "should clear idle resources" do + all = [] of Closable + signal = Channel(Nil).new + pool = create_pool( + initial_pool_size: 0, + max_pool_size: 5, + max_idle_pool_size: 5, + max_idle_time_per_resource: 0.25, + ) do + ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal + end + + # Create 5 resource + 5.times { + spawn do + pool.checkout { sleep 0.1.seconds } + end + } + + all.each &.closed?.should be_false + + 5.times do + signal.receive + end + + # Gone + all.each &.closed?.should be_true + pool.stats.open_connections.should eq(0) + pool.stats.idle_connections.should eq(0) + end + + it "should ensure minimum of initial_pool_size fresh resources" do + all = [] of Closable + signal = Channel(Nil).new + pool = create_pool( + initial_pool_size: 3, + max_pool_size: 5, + max_idle_pool_size: 5, + max_lifetime_per_resource: 2.0, + max_idle_time_per_resource: 0.5, + ) { ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal } + + # The job will replace the three idle expired resources with new ones + 3.times { signal.receive } + # Wait for the replenishment process to finish + sleep 0.25.seconds + + all.size.should eq(6) + all[..2].each &.closed?.should be_true + all[3..].each &.closed?.should be_false + pool.stats.idle_connections.should eq(3) + end + end end diff --git a/src/db/error.cr b/src/db/error.cr index 4268bf2c..8958b7d2 100644 --- a/src/db/error.cr +++ b/src/db/error.cr @@ -39,6 +39,18 @@ module DB class PoolResourceRefused < Error end + # Raised when a checked out resource has reached expiration of any kind + class PoolResourceExpired(T) < PoolResourceLost(T) + end + + # Raised when a checked out resource has exceeded the maximum lifetime + class PoolResourceLifetimeExpired(T) < PoolResourceExpired(T) + end + + # Raised when a checked out resource has idle expired + class PoolResourceIdleExpired(T) < PoolResourceExpired(T) + end + # Raised when an established connection is lost # probably due to socket/network issues. # It is used by the connection pool retry logic. diff --git a/src/db/pool.cr b/src/db/pool.cr index 7eda7c76..ad1daf34 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -16,8 +16,21 @@ module DB # maximum amount of retry attempts to reconnect to the db. See `Pool#retry` retry_attempts : Int32 = 1, # seconds to wait before a retry attempt - retry_delay : Float64 = 0.2 do + retry_delay : Float64 = 0.2, + # maximum number of seconds the resource can persist after being created. 0 to disable. + max_lifetime_per_resource : Float64 | Int32 | Time::Span = 0.0, + # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. + max_idle_time_per_resource : Float64 | Int32 | Time::Span = 0.0, + # whether to enable a background sweeper to remove expired clients. Default is true but it will only be spawned if an expiration is actually set + expired_resource_sweeper : Bool = true, + # number of seconds to wait between each run of the expired resource sweeper. When unset (0) this value defaults to the shortest expiration duration + resource_sweeper_timer : Float64 | Int32 | Time::Span = 0 do def self.from_http_params(params : HTTP::Params, default = Options.new) + enabled_sweeper = params.fetch("expired_resource_sweeper", default.expired_resource_sweeper) + if enabled_sweeper.is_a?(String) + enabled_sweeper = {'1', "true", 't', "yes"}.includes?(enabled_sweeper.downcase) + end + Options.new( initial_pool_size: params.fetch("initial_pool_size", default.initial_pool_size).to_i, max_pool_size: params.fetch("max_pool_size", default.max_pool_size).to_i, @@ -25,6 +38,10 @@ module DB checkout_timeout: params.fetch("checkout_timeout", default.checkout_timeout).to_f, retry_attempts: params.fetch("retry_attempts", default.retry_attempts).to_i, retry_delay: params.fetch("retry_delay", default.retry_delay).to_f, + max_lifetime_per_resource: params.fetch("max_lifetime_per_resource", default.max_lifetime_per_resource).to_f, + max_idle_time_per_resource: params.fetch("max_idle_time_per_resource", default.max_idle_time_per_resource).to_f, + expired_resource_sweeper: enabled_sweeper, + resource_sweeper_timer: params.fetch("resource_sweeper_timer", default.resource_sweeper_timer).to_f, ) end end @@ -44,6 +61,11 @@ module DB # seconds to wait before a retry attempt @retry_delay : Float64 + # maximum number of seconds the resource can persist after being created. 0 to disable. + @max_lifetime_per_resource : Time::Span + # maximum number of seconds an idle resource can remain unused for being removed. 0 to disable. + @max_idle_time_per_resource : Time::Span + # Pool state # total of open connections managed by this pool @@ -53,6 +75,29 @@ module DB # connections waiting to be stablished (they are not in *@idle* nor in *@total*) @inflight : Int32 + @idle_expired_count : Int64 = 0 + @lifetime_expired_count : Int64 = 0 + + # Tracks creation and last (checked out) used timestamps of a specific resource + private class ResourceTimeEntry + # Time of creation + getter creation : Time = Time.utc + # Time the resource was last checked out + getter last_checked_out : Time + + def initialize + @last_checked_out = @creation + end + + # Sets the last checked out time to now + def got_checked_out + @last_checked_out = Time.utc + end + end + + # Maps a resource to a corresponding `ResourceTimeEntry` + @resource_lifecycle = {} of T => ResourceTimeEntry + # Sync state # communicate that a connection is available for checkout @@ -60,6 +105,17 @@ module DB # global pool mutex @mutex : Mutex + # Sweep expired resource job + + # whether the job is enabled or disabled + @sweep_job_enabled : Bool + # has a sweep job running + @sweep_job_running : Bool + # timer between each run + @sweep_timer : Time::Span? + # cancels the sweep job as needed + @sweep_job_close_channel : Channel(Nil) + @[Deprecated("Use `#new` with DB::Pool::Options instead")] def initialize(initial_pool_size = 1, max_pool_size = 0, max_idle_pool_size = 1, checkout_timeout = 5.0, retry_attempts = 1, retry_delay = 0.2, &factory : -> T) @@ -83,7 +139,35 @@ module DB @inflight = 0 @mutex = Mutex.new + @max_lifetime_per_resource = ensure_time_span(pool_options.max_lifetime_per_resource).as(Time::Span) + @max_idle_time_per_resource = ensure_time_span(pool_options.max_idle_time_per_resource).as(Time::Span) + @initial_pool_size.times { build_resource } + + @sweep_job_enabled = pool_options.expired_resource_sweeper + + # Cancels the sweep job as needed + @sweep_job_close_channel = Channel(Nil).new + + @sweep_job_running = false + + if @sweep_job_enabled && !(min_expire = {@max_idle_time_per_resource, @max_lifetime_per_resource}.reject(&.zero?).min?).nil? + sweep_timer = ensure_time_span(pool_options.resource_sweeper_timer).as(Time::Span) + + if sweep_timer.zero? + @sweep_timer = min_expire || sweep_timer + end + + sweep_expired_job + end + end + + private macro ensure_time_span(value) + if {{value}}.is_a? Number + {{value}}.seconds + else + {{value}} + end end # close all resources in the pool @@ -91,13 +175,17 @@ module DB @total.each &.close @total.clear @idle.clear + @resource_lifecycle.clear + @sweep_job_close_channel.send(nil) if @sweep_job_running end record Stats, open_connections : Int32, idle_connections : Int32, in_flight_connections : Int32, - max_connections : Int32 + max_connections : Int32, + idle_expired_connections : Int64, + lifetime_expired_connections : Int64 # Returns stats of the pool def stats @@ -106,6 +194,8 @@ module DB idle_connections: @idle.size, in_flight_connections: @inflight, max_connections: @max_pool_size, + idle_expired_connections: @idle_expired_count, + lifetime_expired_connections: @lifetime_expired_count ) end @@ -114,6 +204,7 @@ module DB resource = nil until resource + sweep_expired_job if !@sweep_job_running resource = if @idle.empty? if can_increase_pool? @inflight += 1 @@ -137,6 +228,9 @@ module DB end end + # Remove client if expired (either idle or lifetime) + remove_expired!(resource) + @idle.delete resource resource @@ -163,8 +257,20 @@ module DB sync do if resource.responds_to?(:closed?) && resource.closed? - @total.delete(resource) + delete(resource) elsif can_increase_idle_pool + # Checks lifetime expiration and updates last checked out time + # Old idle expiration isn't checked because this replaces it. + expire_info = @resource_lifecycle[resource] + if lifetime_expired?(expire_info, Time.utc) + resource.close + delete(resource) + ensure_minimum_fresh_resources + return nil + else + expire_info.got_checked_out + end + @idle << resource if resource.responds_to?(:after_release) resource.after_release @@ -172,16 +278,11 @@ module DB idle_pushed = true else resource.close - @total.delete(resource) + delete(resource) end end - if idle_pushed - select - when @availability_channel.send(nil) - else - end - end + notify_availability if idle_pushed end # :nodoc: @@ -202,6 +303,12 @@ module DB begin sleep @retry_delay if i >= current_available return yield + rescue e : PoolResourceExpired(T) + # A `PoolResourceExpired` is raised internally at #checkout + # and is both closed and deleted from the pool at the time of + # raising. Although we can technically let the rescue of + # `PoolResourceLost(T)` handle the retry, we can avoid an expensive + # mutex lock by doing so manually. rescue e : PoolResourceLost(T) # if the connection is lost it will be closed by # the exception to release resources @@ -220,6 +327,7 @@ module DB def each_resource(&) sync do @idle.each do |resource| + @resource_lifecycle[resource].got_checked_out yield resource end end @@ -234,6 +342,60 @@ module DB def delete(resource : T) @total.delete(resource) @idle.delete(resource) + @resource_lifecycle.delete(resource) + end + + # Inform availability_channel about an available resource + private def notify_availability : Nil + select + when @availability_channel.send(nil) + else + end + end + + # Checks if a resource has exceeded the maximum lifetime + # + # :nodoc: + private def lifetime_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + return false if @max_lifetime_per_resource.zero? + + expired = (time - time_entry.creation) >= @max_lifetime_per_resource + @lifetime_expired_count += 1 if expired + return expired + end + + # Checks if a resource has exceeded the maximum idle time + # + # :nodoc: + private def idle_expired?(time_entry : ResourceTimeEntry, time : Time = Time.utc) + return false if @max_idle_time_per_resource.zero? + + expired = (time - time_entry.last_checked_out) >= @max_idle_time_per_resource + @idle_expired_count += 1 if expired + return expired + end + + # Checks if the resource is expired. Deletes and raises `PoolResourceExpired` if so + # + # :nodoc: + private def remove_expired!(resource : T) + now = Time.utc + expire_info = @resource_lifecycle[resource] + + expiration_type = if lifetime_expired?(expire_info, now) + PoolResourceLifetimeExpired + elsif idle_expired?(expire_info, now) + PoolResourceIdleExpired + else + nil + end + + if expiration_type + resource.close + delete(resource) + ensure_minimum_fresh_resources + raise expiration_type.new(resource) + end end private def build_resource : T @@ -241,6 +403,7 @@ module DB sync do @total << resource @idle << resource + @resource_lifecycle[resource] = ResourceTimeEntry.new end resource end @@ -282,5 +445,66 @@ module DB @mutex.lock end end + + private def sweep_expired_job + timer = @sweep_timer + return if timer.nil? || (timer <= Time::Span::ZERO) + + @sweep_job_running = true + + spawn do + loop do + select + when @sweep_job_close_channel.receive + @sweep_job_running = false + break + when timeout(timer) + end + + sync do + now = Time.utc + + # Although not guaranteed, the first elements of @idle + # should be the oldest + @idle.each do |resource| + expire_info = @resource_lifecycle[resource] + + if lifetime_expired?(expire_info, now) || idle_expired?(expire_info, now) + resource.close + delete(resource) + end + end + + ensure_minimum_fresh_resources + + # End job if there is no initial pool size and the entire pool has been expired + break if !@initial_pool_size && @total.empty? + end + end + end + end + + # Ensure there are at least a minimum of @initial_pool_size non-expired resources + # + # Should be called after each expiration batch + private def ensure_minimum_fresh_resources + replenish = (@initial_pool_size - (@total.size + @inflight)).clamp(0, @initial_pool_size) + + return if replenish <= 0 + + replenish.times do |index| + begin + @inflight += 1 + unsync do + build_resource + notify_availability + end + ensure + @inflight -= 1 + end + end + + return true + end end end