From 0126630e4c257b399458b78ea9a2ff7d39975f80 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 9 May 2025 10:40:46 +0200 Subject: [PATCH 1/3] Started work on a pool manager --- .../ConnectionLease.swift | 22 ++ .../ConnectionPoolModule/ConnectionPool.swift | 54 ++++- .../ConnectionPoolExecutor.swift | 15 ++ .../ConnectionPoolManager.swift | 215 ++++++++++++++++++ .../ConnectionRequest.swift | 4 + .../MockConnection.swift | 10 + .../MockConnectionFactory.swift | 3 +- .../MockExecutor.swift | 23 ++ Sources/PostgresNIO/Pool/PostgresClient.swift | 2 + .../ConnectionPoolTests.swift | 52 +++-- 10 files changed, 380 insertions(+), 20 deletions(-) create mode 100644 Sources/ConnectionPoolModule/ConnectionLease.swift create mode 100644 Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift create mode 100644 Sources/ConnectionPoolModule/ConnectionPoolManager.swift create mode 100644 Sources/ConnectionPoolTestUtils/MockExecutor.swift diff --git a/Sources/ConnectionPoolModule/ConnectionLease.swift b/Sources/ConnectionPoolModule/ConnectionLease.swift new file mode 100644 index 00000000..a93b4707 --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionLease.swift @@ -0,0 +1,22 @@ +// +// ConnectionLease.swift +// postgres-nio +// +// Created by Fabian Fett on 05.05.25. +// + +struct ConnectionLease { + + var connection: Connection + var _release: () -> () + + init(connection: Connection, release: @escaping () -> Void) { + self.connection = connection + self._release = release + } + + func release() { + self._release() + } + +} diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index b460b263..1c78afbd 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -134,9 +134,11 @@ public final class ConnectionPool< Connection: PooledConnection, ConnectionID: Hashable & Sendable, ConnectionIDGenerator: ConnectionIDGeneratorProtocol, + ConnectionConfiguration: Equatable & Sendable, Request: ConnectionRequestProtocol, RequestID: Hashable & Sendable, KeepAliveBehavior: ConnectionKeepAliveBehavior, + Executor: ConnectionPoolExecutor, ObservabilityDelegate: ConnectionPoolObservabilityDelegate, Clock: _Concurrency.Clock >: Sendable where @@ -148,7 +150,7 @@ public final class ConnectionPool< ObservabilityDelegate.ConnectionID == ConnectionID, Clock.Duration == Duration { - public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool) async throws -> ConnectionAndMetadata + public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionConfiguration, ConnectionPool) async throws -> ConnectionAndMetadata @usableFromInline typealias StateMachine = PoolStateMachine> @@ -156,6 +158,11 @@ public final class ConnectionPool< @usableFromInline let factory: ConnectionFactory + public let executor: Executor + + @usableFromInline + let connectionConfiguration: ConnectionConfiguration + @usableFromInline let keepAliveBehavior: KeepAliveBehavior @@ -188,18 +195,22 @@ public final class ConnectionPool< public init( configuration: ConnectionPoolConfiguration, + connectionConfiguration: ConnectionConfiguration, idGenerator: ConnectionIDGenerator, requestType: Request.Type, keepAliveBehavior: KeepAliveBehavior, + executor: Executor, observabilityDelegate: ObservabilityDelegate, clock: Clock, connectionFactory: @escaping ConnectionFactory ) { + self.executor = executor self.clock = clock self.factory = connectionFactory self.keepAliveBehavior = keepAliveBehavior self.observabilityDelegate = observabilityDelegate self.configuration = configuration + self.connectionConfiguration = connectionConfiguration var stateMachine = StateMachine( configuration: .init(configuration, keepAliveBehavior: keepAliveBehavior), generator: idGenerator, @@ -271,6 +282,13 @@ public final class ConnectionPool< } } + @inlinable + public func updateConfiguration(_ configuration: ConnectionConfiguration, forceReconnection: Bool) { + // TODO: Implement connection will close correctly + // If the forceReconnection flag is set, we should gracefully close the connection once they + // are returned the next time. + } + @inlinable public func run() async { await withTaskCancellationHandler { @@ -419,11 +437,11 @@ public final class ConnectionPool< @inlinable /*private*/ func makeConnection(for request: StateMachine.ConnectionRequest, in taskGroup: inout some TaskGroupProtocol) { - taskGroup.addTask_ { + self.addTask(into: &taskGroup) { self.observabilityDelegate.startedConnecting(id: request.connectionID) do { - let bundle = try await self.factory(request.connectionID, self) + let bundle = try await self.factory(request.connectionID, self.connectionConfiguration, self) self.connectionEstablished(bundle) // after the connection has been established, we keep the task open. This ensures @@ -468,7 +486,7 @@ public final class ConnectionPool< /*private*/ func runKeepAlive(_ connection: Connection, in taskGroup: inout some TaskGroupProtocol) { self.observabilityDelegate.keepAliveTriggered(id: connection.id) - taskGroup.addTask_ { + self.addTask(into: &taskGroup) { do { try await self.keepAliveBehavior.runKeepAlive(for: connection) @@ -502,8 +520,8 @@ public final class ConnectionPool< } @inlinable - /*private*/ func runTimer(_ timer: StateMachine.Timer, in poolGroup: inout some TaskGroupProtocol) { - poolGroup.addTask_ { () async -> () in + /*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) { + self.addTask(into: &taskGroup) { () async -> () in await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in taskGroup.addTask { do { @@ -554,6 +572,15 @@ public final class ConnectionPool< token.resume() } } + + @inlinable + func addTask(into taskGroup: inout some TaskGroupProtocol, operation: @escaping @Sendable () async -> Void) { + if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *), let executor = self.executor as? TaskExecutor { + taskGroup.addTask_(executorPreference: executor, operation: operation) + } else { + taskGroup.addTask_(operation: operation) + } + } } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -573,6 +600,9 @@ protocol TaskGroupProtocol { // under exactly this name and others have different attributes. So let's pick // a name that doesn't clash anywhere and implement it using the standard `addTask`. mutating func addTask_(operation: @escaping @Sendable () async -> Void) + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + mutating func addTask_(executorPreference: ((any TaskExecutor)?), operation: @escaping @Sendable () async -> Void) } @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) @@ -581,6 +611,12 @@ extension DiscardingTaskGroup: TaskGroupProtocol { mutating func addTask_(operation: @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + @inlinable + mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) { + self.addTask(executorPreference: executorPreference, operation: operation) + } } extension TaskGroup: TaskGroupProtocol { @@ -588,4 +624,10 @@ extension TaskGroup: TaskGroupProtocol { mutating func addTask_(operation: @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } + + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + @inlinable + mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) { + self.addTask(executorPreference: executorPreference, operation: operation) + } } diff --git a/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift new file mode 100644 index 00000000..f4e6d185 --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift @@ -0,0 +1,15 @@ +public protocol ConnectionPoolExecutor: AnyObject, Sendable { + associatedtype ID: Hashable, Sendable + + var id: ID { get } + + static func getExecutorID() -> Self.ID? +} + +public final class NothingConnectionPoolExecutor: ConnectionPoolExecutor { + public typealias ID = ObjectIdentifier + + public var id: ObjectIdentifier { ObjectIdentifier(self) } + + public static func getExecutorID() -> ObjectIdentifier? { nil } +} diff --git a/Sources/ConnectionPoolModule/ConnectionPoolManager.swift b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift new file mode 100644 index 00000000..5c5e03d2 --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift @@ -0,0 +1,215 @@ +import Atomics + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +public struct ConnectionPoolManagerConfiguration: Sendable { + /// The minimum number of connections to preserve in the pool. + /// + /// If the pool is mostly idle and the remote servers closes + /// idle connections, + /// the `ConnectionPool` will initiate new outbound + /// connections proactively to avoid the number of available + /// connections dropping below this number. + public var minimumConnectionPerExecutorCount: Int + + /// Between the `minimumConnectionCount` and + /// `maximumConnectionSoftLimit` the connection pool creates + /// _preserved_ connections. Preserved connections are closed + /// if they have been idle for ``idleTimeout``. + public var maximumConnectionPerExecutorSoftLimit: Int + + /// The maximum number of connections for this pool, that can + /// exist at any point in time. The pool can create _overflow_ + /// connections, if all connections are leased, and the + /// `maximumConnectionHardLimit` > `maximumConnectionSoftLimit ` + /// Overflow connections are closed immediately as soon as they + /// become idle. + public var maximumConnectionPerExecutorHardLimit: Int + + /// The time that a _preserved_ idle connection stays in the + /// pool before it is closed. + public var idleTimeout: Duration + + /// initializer + public init() { + self.minimumConnectionPerExecutorCount = 0 + self.maximumConnectionPerExecutorSoftLimit = 16 + self.maximumConnectionPerExecutorHardLimit = 16 + self.idleTimeout = .seconds(60) + } +} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +public final class ConnectionPoolManager< + Connection: PooledConnection, + ConnectionID: Hashable & Sendable, + ConnectionIDGenerator: ConnectionIDGeneratorProtocol, + ConnectionConfiguration: Equatable & Sendable, + Request: ConnectionRequestProtocol, + RequestID: Hashable & Sendable, + KeepAliveBehavior: ConnectionKeepAliveBehavior, + Executor: ConnectionPoolExecutor, + ObservabilityDelegate: ConnectionPoolObservabilityDelegate, + Clock: _Concurrency.Clock +>: Sendable where + Connection.ID == ConnectionID, + ConnectionIDGenerator.ID == ConnectionID, + Request.Connection == Connection, + Request.ID == RequestID, + KeepAliveBehavior.Connection == Connection, + ObservabilityDelegate.ConnectionID == ConnectionID, + Clock.Duration == Duration +{ + public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionConfiguration, ConnectionPool) async throws -> ConnectionAndMetadata + + public typealias ConnectionPool = _ConnectionPoolModule.ConnectionPool< + Connection, + ConnectionID, + ConnectionIDGenerator, + ConnectionConfiguration, + Request, + RequestID, + KeepAliveBehavior, + Executor, + ObservabilityDelegate, + Clock + > + + @usableFromInline + let pools: [Executor.ID: ConnectionPool] + + @usableFromInline + let roundRobinCounter = ManagedAtomic(0) + + @usableFromInline + let roundRobinPools: [ConnectionPool] + + @usableFromInline + let actionsStream: AsyncStream + + @usableFromInline + let eventContinuation: AsyncStream.Continuation + + public init( + configuration: ConnectionPoolManagerConfiguration, + connectionConfiguration: ConnectionConfiguration, + idGenerator: ConnectionIDGenerator, + requestType: Request.Type, + keepAliveBehavior: KeepAliveBehavior, + executors: [Executor], + observabilityDelegate: ObservabilityDelegate, + clock: Clock, + connectionFactory: @escaping ConnectionFactory + ) { + let (stream, continuation) = AsyncStream.makeStream(of: Actions.self) + self.actionsStream = stream + self.eventContinuation = continuation + + var pools = [Executor.ID: ConnectionPool]() + pools.reserveCapacity(executors.count) + + var singlePoolConfig = ConnectionPoolConfiguration() + singlePoolConfig.minimumConnectionCount = configuration.minimumConnectionPerExecutorCount + singlePoolConfig.maximumConnectionSoftLimit = configuration.maximumConnectionPerExecutorSoftLimit + singlePoolConfig.maximumConnectionHardLimit = configuration.maximumConnectionPerExecutorHardLimit + + for executor in executors { + pools[executor.id] = ConnectionPool( + configuration: singlePoolConfig, + connectionConfiguration: connectionConfiguration, + idGenerator: idGenerator, + requestType: requestType, + keepAliveBehavior: keepAliveBehavior, + executor: executor, + observabilityDelegate: observabilityDelegate, + clock: clock, + connectionFactory: connectionFactory + ) + } + + self.pools = pools + self.roundRobinPools = Array(pools.values) + + for pool in pools.values { + self.eventContinuation.yield(.runPool(pool)) + } + } + + @inlinable + public func leaseConnection(_ request: Request) { + if let executorID = Executor.getExecutorID(), let pool = self.pools[executorID] { + pool.leaseConnection(request) + } + + let index = self.roundRobinCounter.loadThenWrappingIncrement(ordering: .relaxed) % self.roundRobinPools.count + + self.roundRobinPools[index].leaseConnection(request) + } + + @usableFromInline + enum Actions: Sendable { + case runPool(ConnectionPool) + } + + @inlinable + public func run() async { + await withTaskCancellationHandler { + if #available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) { + return await withDiscardingTaskGroup() { taskGroup in + await self.run(in: &taskGroup) + } + } + return await withTaskGroup(of: Void.self) { taskGroup in + await self.run(in: &taskGroup) + } + } onCancel: { + + } + } + + @inlinable + public func updateConfiguration(_ configuration: ConnectionConfiguration, forceReconnection: Bool) { + for pool in self.pools.values { + pool.updateConfiguration(configuration, forceReconnection: forceReconnection) + } + } + + // MARK: - Private Methods - + + @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) + @inlinable + /* private */ func run(in taskGroup: inout DiscardingTaskGroup) async { + for await event in self.actionsStream { + self.runEvent(event, in: &taskGroup) + } + } + + @inlinable + /* private */ func run(in taskGroup: inout TaskGroup) async { + var running = 0 + for await event in self.actionsStream { + running += 1 + self.runEvent(event, in: &taskGroup) + + if running == 100 { + _ = await taskGroup.next() + running -= 1 + } + } + } + + @inlinable + /* private */ func runEvent(_ event: Actions, in taskGroup: inout some TaskGroupProtocol) { + switch event { + case .runPool(let pool): + if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *), let executor = pool.executor as? TaskExecutor { + taskGroup.addTask_(executorPreference: executor) { + await pool.run() + } + } else { + taskGroup.addTask_ { + await pool.run() + } + } + } + } +} diff --git a/Sources/ConnectionPoolModule/ConnectionRequest.swift b/Sources/ConnectionPoolModule/ConnectionRequest.swift index 1d1c55da..365b94a0 100644 --- a/Sources/ConnectionPoolModule/ConnectionRequest.swift +++ b/Sources/ConnectionPoolModule/ConnectionRequest.swift @@ -28,17 +28,21 @@ let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator() extension ConnectionPool where Request == ConnectionRequest { public convenience init( configuration: ConnectionPoolConfiguration, + connectionConfiguration: ConnectionConfiguration, idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(), keepAliveBehavior: KeepAliveBehavior, + executor: Executor, observabilityDelegate: ObservabilityDelegate, clock: Clock = ContinuousClock(), connectionFactory: @escaping ConnectionFactory ) { self.init( configuration: configuration, + connectionConfiguration: connectionConfiguration, idGenerator: idGenerator, requestType: ConnectionRequest.self, keepAliveBehavior: keepAliveBehavior, + executor: executor, observabilityDelegate: observabilityDelegate, clock: clock, connectionFactory: connectionFactory diff --git a/Sources/ConnectionPoolTestUtils/MockConnection.swift b/Sources/ConnectionPoolTestUtils/MockConnection.swift index db5c3ef7..149d0b56 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnection.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnection.swift @@ -2,6 +2,16 @@ import _ConnectionPoolModule import DequeModule import NIOConcurrencyHelpers +public struct MockConnectionConfiguration: Sendable, Hashable { + public var username: String + public var password: String + + public init(username: String, password: String) { + self.username = username + self.password = password + } +} + public final class MockConnection: PooledConnection, Sendable { public typealias ID = Int diff --git a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift index 936b47cc..2ada3432 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift @@ -37,7 +37,8 @@ public final class MockConnectionFactory: Sendable wh public func makeConnection( id: Int, - for pool: ConnectionPool, NoOpConnectionPoolMetrics, Clock> + configuration: MockConnectionConfiguration, + for pool: ConnectionPool, MockExecutor, NoOpConnectionPoolMetrics, Clock> ) async throws -> ConnectionAndMetadata { if let autoMaxStreams = self.autoMaxStreams { let connection = MockConnection(id: id) diff --git a/Sources/ConnectionPoolTestUtils/MockExecutor.swift b/Sources/ConnectionPoolTestUtils/MockExecutor.swift new file mode 100644 index 00000000..f1b93956 --- /dev/null +++ b/Sources/ConnectionPoolTestUtils/MockExecutor.swift @@ -0,0 +1,23 @@ +// +// MockExecutor.swift +// postgres-nio +// +// Created by Fabian Fett on 07.05.25. +// + +import _ConnectionPoolModule + +public final class MockExecutor: ConnectionPoolExecutor, Sendable { + public typealias ID = ObjectIdentifier + + public var id: ID { ObjectIdentifier(self) } + + static public func getExecutorID() -> ObjectIdentifier? { + MockExecutor.executorID + } + + public init() {} + + @TaskLocal + static var executorID: MockExecutor.ID? +} diff --git a/Sources/PostgresNIO/Pool/PostgresClient.swift b/Sources/PostgresNIO/Pool/PostgresClient.swift index d54e34eb..76ac7208 100644 --- a/Sources/PostgresNIO/Pool/PostgresClient.swift +++ b/Sources/PostgresNIO/Pool/PostgresClient.swift @@ -237,8 +237,10 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { PostgresConnection, PostgresConnection.ID, ConnectionIDGenerator, + PostgresConnection.Configuration, ConnectionRequest, ConnectionRequest.ID, + NothingConnectionPoolExecutor, PostgresKeepAliveBehavor, PostgresClientMetrics, ContinuousClock diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index c745b4a0..59499ba5 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -15,13 +15,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: ContinuousClock() ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } // the same connection is reused 1000 times @@ -73,13 +75,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup(of: Void.self) { taskGroup in @@ -118,13 +122,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup(of: Void.self) { taskGroup in @@ -155,13 +161,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: ContinuousClock() ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } let hasFinished = ManagedAtomic(false) @@ -236,13 +244,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -315,13 +325,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -401,13 +413,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -470,13 +484,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -530,13 +546,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -594,13 +612,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -649,13 +669,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -710,13 +732,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -775,13 +799,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, + executor: MockExecutor(), observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in From fe54e1ad819207f2c446f253498e56cf0066374b Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 12 May 2025 15:09:57 +0200 Subject: [PATCH 2/3] Further improvements --- .../ConnectionPoolBenchmarks.swift | 141 +++++++++++++++++- .../NIOTaskExecutor.swift | 74 +++++++++ Benchmarks/Package.swift | 3 + .../ConnectionLease.swift | 15 +- .../ConnectionPoolModule/ConnectionPool.swift | 8 +- .../ConnectionPoolExecutor.swift | 2 + .../ConnectionPoolManager.swift | 1 + .../ConnectionRequest.swift | 34 ++++- .../MockConnection.swift | 4 +- .../MockConnectionFactory.swift | 42 +++--- .../ConnectionPoolTestUtils/MockRequest.swift | 8 +- Sources/PostgresNIO/Pool/PostgresClient.swift | 38 +++-- .../ConnectionPoolTests.swift | 92 ++++++------ .../ConnectionRequestTests.swift | 9 +- 14 files changed, 358 insertions(+), 113 deletions(-) create mode 100644 Benchmarks/Benchmarks/ConnectionPoolBenchmarks/NIOTaskExecutor.swift diff --git a/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift b/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift index 9cc535d4..c988c4b3 100644 --- a/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift +++ b/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift @@ -1,23 +1,27 @@ import _ConnectionPoolModule import _ConnectionPoolTestUtils import Benchmark +import NIOCore +import NIOPosix let benchmarks: @Sendable () -> Void = { - Benchmark("Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in + Benchmark("Pool: Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in let clock = MockClock() - let factory = MockConnectionFactory(autoMaxStreams: 1) + let factory = MockConnectionFactory(autoMaxStreams: 1) var configuration = ConnectionPoolConfiguration() configuration.maximumConnectionSoftLimit = 50 configuration.maximumConnectionHardLimit = 50 let pool = ConnectionPool( configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup { taskGroup in @@ -54,21 +58,23 @@ let benchmarks: @Sendable () -> Void = { } } - Benchmark("Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in + Benchmark("Pool: Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in let clock = MockClock() - let factory = MockConnectionFactory(autoMaxStreams: 1) + let factory = MockConnectionFactory(autoMaxStreams: 1) var configuration = ConnectionPoolConfiguration() configuration.maximumConnectionSoftLimit = 50 configuration.maximumConnectionHardLimit = 50 let pool = ConnectionPool( configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup { taskGroup in @@ -96,4 +102,123 @@ let benchmarks: @Sendable () -> Void = { taskGroup.cancelAll() } } + + Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 50 parallel – 10 MockExecutors", configuration: .init(scalingFactor: .kilo)) { benchmark in + let clock = MockClock() + let factory = MockConnectionFactory(autoMaxStreams: 1) + var configuration = ConnectionPoolManagerConfiguration() + let executorCount = 10 + let executors = (0...self, + keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executors: executors, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, configuration: $1, for: $2) + } + + await withTaskGroup { taskGroup in + taskGroup.addTask { + await pool.run() + } + + let sequential = benchmark.scaledIterations.upperBound / concurrency + + benchmark.startMeasurement() + + for parallel in 0..(autoMaxStreams: 1) + var configuration = ConnectionPoolManagerConfiguration() + try await NIOTaskExecutor.withExecutors(eventLoops) { executors in + let concurrency = 50 + + configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executors.count + configuration.maximumConnectionPerExecutorHardLimit = concurrency / executors.count + + let pool = ConnectionPoolManager( + configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executors: executors, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, configuration: $1, for: $2) + } + + await withTaskGroup { taskGroup in + taskGroup.addTask { + await pool.run() + } + + let sequential = benchmark.scaledIterations.upperBound / executors.count + + benchmark.startMeasurement() + + for executor in executors { + taskGroup.addTask(executorPreference: executor) { + for _ in 0..() + + let eventLoop: any EventLoop + + private init(eventLoop: any EventLoop) { + self.eventLoop = eventLoop + } + + static func withExecutors(_ eventLoops: MultiThreadedEventLoopGroup, _ body: ([NIOTaskExecutor]) async throws -> ()) async throws { + var executors = [NIOTaskExecutor]() + for eventLoop in eventLoops.makeIterator() { + let executor = NIOTaskExecutor(eventLoop: eventLoop) + try await eventLoop.submit { + NIOTaskExecutor.threadSpecificEventLoop.currentValue = executor + }.get() + executors.append(executor) + } + do { + try await body(executors) + } catch { + + } + for eventLoop in eventLoops.makeIterator() { + try await eventLoop.submit { + NIOTaskExecutor.threadSpecificEventLoop.currentValue = nil + }.get() + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) +extension NIOTaskExecutor: TaskExecutor { + + func enqueue(_ job: consuming ExecutorJob) { + // By default we are just going to use execute to run the job + // this is quite heavy since it allocates the closure for + // every single job. + let unownedJob = UnownedJob(job) + self.eventLoop.execute { + unownedJob.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } + + func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) +extension NIOTaskExecutor: ConnectionPoolExecutor { + typealias ID = ObjectIdentifier + + var id: ObjectIdentifier { + ObjectIdentifier(self) + } + + static func getExecutorID() -> ObjectIdentifier? { + self.threadSpecificEventLoop.currentValue?.id + } +} diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 11407176..1ccd46cc 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -10,6 +10,7 @@ let package = Package( dependencies: [ .package(path: "../"), .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.82.0"), ], targets: [ .executableTarget( @@ -18,6 +19,8 @@ let package = Package( .product(name: "_ConnectionPoolModule", package: "postgres-nio"), .product(name: "_ConnectionPoolTestUtils", package: "postgres-nio"), .product(name: "Benchmark", package: "package-benchmark"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), ], path: "Benchmarks/ConnectionPoolBenchmarks", plugins: [ diff --git a/Sources/ConnectionPoolModule/ConnectionLease.swift b/Sources/ConnectionPoolModule/ConnectionLease.swift index a93b4707..ace63bc8 100644 --- a/Sources/ConnectionPoolModule/ConnectionLease.swift +++ b/Sources/ConnectionPoolModule/ConnectionLease.swift @@ -5,18 +5,21 @@ // Created by Fabian Fett on 05.05.25. // -struct ConnectionLease { +public struct ConnectionLease: Sendable { - var connection: Connection - var _release: () -> () + public var connection: Connection + + @usableFromInline + let _release: @Sendable () -> () - init(connection: Connection, release: @escaping () -> Void) { + @inlinable + public init(connection: Connection, release: @escaping @Sendable () -> Void) { self.connection = connection self._release = release } - func release() { + @inlinable + public func release() { self._release() } - } diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 1c78afbd..0cd844f4 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -88,7 +88,7 @@ public protocol ConnectionRequestProtocol: Sendable { /// A function that is called with a connection or a /// `PoolError`. - func complete(with: Result) + func complete(with: Result, ConnectionPoolError>) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -420,8 +420,11 @@ public final class ConnectionPool< /*private*/ func runRequestAction(_ action: StateMachine.RequestAction) { switch action { case .leaseConnection(let requests, let connection): + let lease = ConnectionLease(connection: connection) { + self.releaseConnection(connection) + } for request in requests { - request.complete(with: .success(connection)) + request.complete(with: .success(lease)) } case .failRequest(let request, let error): @@ -521,6 +524,7 @@ public final class ConnectionPool< @inlinable /*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) { + print("timer: \(timer.connectionID), underlying: \(timer.underlying.usecase)") self.addTask(into: &taskGroup) { () async -> () in await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in taskGroup.addTask { diff --git a/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift index f4e6d185..b887752a 100644 --- a/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift +++ b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift @@ -9,6 +9,8 @@ public protocol ConnectionPoolExecutor: AnyObject, Sendable { public final class NothingConnectionPoolExecutor: ConnectionPoolExecutor { public typealias ID = ObjectIdentifier + public init() {} + public var id: ObjectIdentifier { ObjectIdentifier(self) } public static func getExecutorID() -> ObjectIdentifier? { nil } diff --git a/Sources/ConnectionPoolModule/ConnectionPoolManager.swift b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift index 5c5e03d2..b4552d82 100644 --- a/Sources/ConnectionPoolModule/ConnectionPoolManager.swift +++ b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift @@ -89,6 +89,7 @@ public final class ConnectionPoolManager< @usableFromInline let eventContinuation: AsyncStream.Continuation + @inlinable public init( configuration: ConnectionPoolManagerConfiguration, connectionConfiguration: ConnectionConfiguration, diff --git a/Sources/ConnectionPoolModule/ConnectionRequest.swift b/Sources/ConnectionPoolModule/ConnectionRequest.swift index 365b94a0..71f40cc1 100644 --- a/Sources/ConnectionPoolModule/ConnectionRequest.swift +++ b/Sources/ConnectionPoolModule/ConnectionRequest.swift @@ -5,18 +5,18 @@ public struct ConnectionRequest: ConnectionRequest public var id: ID @usableFromInline - private(set) var continuation: CheckedContinuation + private(set) var continuation: CheckedContinuation, any Error> @inlinable init( id: Int, - continuation: CheckedContinuation + continuation: CheckedContinuation, any Error> ) { self.id = id self.continuation = continuation } - public func complete(with result: Result) { + public func complete(with result: Result, ConnectionPoolError>) { self.continuation.resume(with: result) } } @@ -50,7 +50,7 @@ extension ConnectionPool where Request == ConnectionRequest { } @inlinable - public func leaseConnection() async throws -> Connection { + public func leaseConnection() async throws -> ConnectionLease { let requestID = requestIDGenerator.next() let connection = try await withTaskCancellationHandler { @@ -58,7 +58,7 @@ extension ConnectionPool where Request == ConnectionRequest { throw CancellationError() } - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation, Error>) in let request = Request( id: requestID, continuation: continuation @@ -75,8 +75,26 @@ extension ConnectionPool where Request == ConnectionRequest { @inlinable public func withConnection(_ closure: (Connection) async throws -> Result) async throws -> Result { - let connection = try await self.leaseConnection() - defer { self.releaseConnection(connection) } - return try await closure(connection) + let lease = try await self.leaseConnection() + defer { lease.release() } + return try await closure(lease.connection) + } +} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension ConnectionPoolManager where Request == ConnectionRequest { + @inlinable + public func leaseConnection() async throws -> ConnectionLease { + + let index = self.roundRobinCounter.loadThenWrappingIncrement(ordering: .relaxed) % self.roundRobinPools.count + + return try await self.roundRobinPools[index].leaseConnection() + } + + @inlinable + public func withConnection(_ closure: (Connection) async throws -> Result) async throws -> Result { + let lease = try await self.leaseConnection() + defer { lease.release() } + return try await closure(lease.connection) } } diff --git a/Sources/ConnectionPoolTestUtils/MockConnection.swift b/Sources/ConnectionPoolTestUtils/MockConnection.swift index 149d0b56..5691a28e 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnection.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnection.swift @@ -12,7 +12,7 @@ public struct MockConnectionConfiguration: Sendable, Hashable { } } -public final class MockConnection: PooledConnection, Sendable { +public final class MockConnection: PooledConnection, Sendable { public typealias ID = Int public let id: ID @@ -25,7 +25,7 @@ public final class MockConnection: PooledConnection, Sendable { private let lock: NIOLockedValueBox = NIOLockedValueBox(.running([], [])) - public init(id: Int) { + public init(id: Int, executor: Executor) { self.id = id } diff --git a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift index 2ada3432..86ff7f65 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift @@ -3,24 +3,30 @@ import DequeModule import NIOConcurrencyHelpers @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -public final class MockConnectionFactory: Sendable where Clock.Duration == Duration { +public final class MockConnectionFactory: Sendable where Clock.Duration == Duration { public typealias ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator - public typealias Request = ConnectionRequest + public typealias Request = ConnectionRequest> public typealias KeepAliveBehavior = MockPingPongBehavior public typealias MetricsDelegate = NoOpConnectionPoolMetrics public typealias ConnectionID = Int public typealias Connection = MockConnection + @usableFromInline let stateBox = NIOLockedValueBox(State()) + @usableFromInline struct State { - var attempts = Deque<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)>() + @usableFromInline + var attempts = Deque<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>)>() - var waiter = Deque), Never>>() + @usableFromInline + var waiter = Deque, UInt16), any Error>), Never>>() - var runningConnections = [ConnectionID: Connection]() + @usableFromInline + var runningConnections = [ConnectionID: Connection]() } + @usableFromInline let autoMaxStreams: UInt16? public init(autoMaxStreams: UInt16? = nil) { @@ -31,17 +37,18 @@ public final class MockConnectionFactory: Sendable wh self.stateBox.withLockedValue { $0.attempts.count } } - public var runningConnections: [Connection] { + public var runningConnections: [Connection] { self.stateBox.withLockedValue { Array($0.runningConnections.values) } } + @inlinable public func makeConnection( id: Int, configuration: MockConnectionConfiguration, - for pool: ConnectionPool, MockExecutor, NoOpConnectionPoolMetrics, Clock> - ) async throws -> ConnectionAndMetadata { + for pool: ConnectionPool, Int, ConnectionIDGenerator, MockConnectionConfiguration, some ConnectionRequestProtocol, Int, MockPingPongBehavior>, Executor, NoOpConnectionPoolMetrics, Clock> + ) async throws -> ConnectionAndMetadata> { if let autoMaxStreams = self.autoMaxStreams { - let connection = MockConnection(id: id) + let connection = MockConnection(id: id, executor: pool.executor) Task { try? await connection.signalToClose connection.closeIfClosing() @@ -50,18 +57,18 @@ public final class MockConnectionFactory: Sendable wh } // we currently don't support cancellation when creating a connection - let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in - let waiter = self.stateBox.withLockedValue { state -> (CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>)? in + let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in + let waiter = self.stateBox.withLockedValue { state -> (CheckedContinuation<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>)? in if let waiter = state.waiter.popFirst() { return waiter } else { - state.attempts.append((id, checkedContinuation)) + state.attempts.append((id, pool.executor, checkedContinuation)) return nil } } if let waiter { - waiter.resume(returning: (id, checkedContinuation)) + waiter.resume(returning: (id, pool.executor, checkedContinuation)) } } @@ -69,9 +76,10 @@ public final class MockConnectionFactory: Sendable wh } @discardableResult - public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection { - let (connectionID, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in - let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)? in + @inlinable + public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection { + let (connectionID, executor, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in + let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>)? in if let attempt = state.attempts.popFirst() { return attempt } else { @@ -87,7 +95,7 @@ public final class MockConnectionFactory: Sendable wh do { let streamCount = try await closure(connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: executor) connection.onClose { _ in self.stateBox.withLockedValue { state in diff --git a/Sources/ConnectionPoolTestUtils/MockRequest.swift b/Sources/ConnectionPoolTestUtils/MockRequest.swift index 5e4e2fc0..0151fda2 100644 --- a/Sources/ConnectionPoolTestUtils/MockRequest.swift +++ b/Sources/ConnectionPoolTestUtils/MockRequest.swift @@ -1,8 +1,7 @@ import _ConnectionPoolModule -public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { - public typealias Connection = MockConnection - +public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { + public struct ID: Hashable, Sendable { var objectID: ObjectIdentifier @@ -23,7 +22,8 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { hasher.combine(self.id) } - public func complete(with: Result) { + @inlinable + public func complete(with: Result, ConnectionPoolError>) { } } diff --git a/Sources/PostgresNIO/Pool/PostgresClient.swift b/Sources/PostgresNIO/Pool/PostgresClient.swift index 76ac7208..a6d313c0 100644 --- a/Sources/PostgresNIO/Pool/PostgresClient.swift +++ b/Sources/PostgresNIO/Pool/PostgresClient.swift @@ -237,11 +237,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { PostgresConnection, PostgresConnection.ID, ConnectionIDGenerator, - PostgresConnection.Configuration, + Foo, ConnectionRequest, ConnectionRequest.ID, - NothingConnectionPoolExecutor, PostgresKeepAliveBehavor, + NothingConnectionPoolExecutor, PostgresClientMetrics, ContinuousClock > @@ -284,12 +284,14 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { self.pool = ConnectionPool( configuration: .init(configuration), + connectionConfiguration: Foo(), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger), + executor: NothingConnectionPoolExecutor(), observabilityDelegate: .init(logger: backgroundLogger), clock: ContinuousClock() - ) { (connectionID, pool) in + ) { (connectionID, connectionConfiguration, pool) in let connection = try await factory.makeConnection(connectionID, pool: pool) return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1) @@ -303,11 +305,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { /// - Returns: The closure's return value. @_disfavoredOverload public func withConnection(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() - defer { self.pool.releaseConnection(connection) } + defer { lease.release() } - return try await closure(connection) + return try await closure(lease.connection) } #if compiler(>=6.0) @@ -321,11 +323,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { // DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED // https://github.com/swiftlang/swift/issues/79285 _ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() - defer { self.pool.releaseConnection(connection) } + defer { lease.release() } - return try await closure(connection) + return try await closure(lease.connection) } /// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime. @@ -406,7 +408,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line) } - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() + let connection = lease.connection var logger = logger logger[postgresMetadataKey: .connectionID] = "\(connection.id)" @@ -421,12 +424,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { connection.channel.write(HandlerTask.extendedQuery(context), promise: nil) promise.futureResult.whenFailure { _ in - self.pool.releaseConnection(connection) + lease.release() } return try await promise.futureResult.map { $0.asyncSequence(onFinish: { - self.pool.releaseConnection(connection) + lease.release() }) }.get() } catch var error as PSQLError { @@ -448,7 +451,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { let logger = logger ?? Self.loggingDisabled do { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() + let connection = lease.connection let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self) let task = HandlerTask.executePreparedStatement(.init( @@ -462,11 +466,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { connection.channel.write(task, promise: nil) promise.futureResult.whenFailure { _ in - self.pool.releaseConnection(connection) + lease.release() } return try await promise.futureResult - .map { $0.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }) } + .map { $0.asyncSequence(onFinish: { lease.release() }) } .get() .map { try preparedStatement.decodeRow($0) } } catch var error as PSQLError { @@ -506,7 +510,7 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { // MARK: - Private Methods - - private func leaseConnection() async throws -> PostgresConnection { + private func leaseConnection() async throws -> ConnectionLease { if !self.runningAtomic.load(ordering: .relaxed) { self.backgroundLogger.warning("Trying to lease connection from `PostgresClient`, but `PostgresClient.run()` hasn't been called yet.") } @@ -581,3 +585,5 @@ extension ConnectionPoolError { return psqlError } } + +struct Foo: Equatable {} diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 59499ba5..14ec706d 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -41,15 +41,13 @@ final class ConnectionPoolTests: XCTestCase { do { for _ in 0..<1000 { async let connectionFuture = try await pool.leaseConnection() - var leasedConnection: MockConnection? + var connectionLease: ConnectionLease? XCTAssertEqual(factory.pendingConnectionAttemptsCount, 0) - leasedConnection = try await connectionFuture - XCTAssertNotNil(leasedConnection) - XCTAssert(createdConnection === leasedConnection) + connectionLease = try await connectionFuture + XCTAssertNotNil(connectionLease) + XCTAssert(createdConnection === connectionLease?.connection) - if let leasedConnection { - pool.releaseConnection(leasedConnection) - } + connectionLease?.release() } } catch { XCTFail("Unexpected error: \(error)") @@ -203,8 +201,8 @@ final class ConnectionPoolTests: XCTestCase { for _ in 0..]() for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLeases.append(connection) } // Ensure that we got 4 distinct connections - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 4) + XCTAssertEqual(Set(connectionLeases.lazy.map(\.connection.id)).count, 4) // release all 4 leased connections - for connection in connections { - pool.releaseConnection(connection) + for lease in connectionLeases { + lease.release() } // shutdown @@ -751,7 +749,7 @@ final class ConnectionPoolTests: XCTestCase { // create 4 connection requests let requests = (0..<10).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) - var connections = [MockConnection]() + var connectionLeases = [ConnectionLease]() await factory.nextConnectAttempt { connectionID in return 10 @@ -759,15 +757,15 @@ final class ConnectionPoolTests: XCTestCase { for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLeases.append(connection) } // Ensure that all requests got the same connection - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + XCTAssertEqual(Set(connectionLeases.lazy.map(\.connection.id)).count, 1) // release all 10 leased streams - for connection in connections { - pool.releaseConnection(connection) + for lease in connectionLeases { + lease.release() } for _ in 0..<9 { @@ -818,41 +816,41 @@ final class ConnectionPoolTests: XCTestCase { // create 4 connection requests var requests = (0..<21).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) - var connections = [MockConnection]() + var connectionLease = [ConnectionLease]() await factory.nextConnectAttempt { connectionID in return 1 } - let connection = try await requests.first!.future.success - connections.append(connection) + let lease = try await requests.first!.future.success + connectionLease.append(lease) requests.removeFirst() - pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21) + pool.connectionReceivedNewMaxStreamSetting(lease.connection, newMaxStreamSetting: 21) for (_, request) in requests.enumerated() { let connection = try await request.future.success - connections.append(connection) + connectionLease.append(connection) } // Ensure that all requests got the same connection - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + XCTAssertEqual(Set(connectionLease.lazy.map(\.connection.id)).count, 1) requests = (22..<42).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) // release all 21 leased streams in a single call - pool.releaseConnection(connection, streams: 21) + pool.releaseConnection(lease.connection, streams: 21) // ensure all 20 new requests got fulfilled for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLease.append(connection) } // release all 20 leased streams one by one for _ in requests { - pool.releaseConnection(connection, streams: 1) + pool.releaseConnection(lease.connection, streams: 1) } // shutdown @@ -866,14 +864,14 @@ final class ConnectionPoolTests: XCTestCase { struct ConnectionFuture: ConnectionRequestProtocol { let id: Int - let future: Future + let future: Future> init(id: Int) { self.id = id - self.future = Future(of: MockConnection.self) + self.future = Future(of: ConnectionLease.self) } - func complete(with result: Result) { + func complete(with result: Result, ConnectionPoolError>) { switch result { case .success(let success): self.future.yield(value: success) diff --git a/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift index 537efbd9..59115a59 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift @@ -6,13 +6,16 @@ final class ConnectionRequestTests: XCTestCase { func testHappyPath() async throws { let mockConnection = MockConnection(id: 1) - let connection = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let lease = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation, any Error>) in let request = ConnectionRequest(id: 42, continuation: continuation) XCTAssertEqual(request.id, 42) - continuation.resume(with: .success(mockConnection)) + let lease = ConnectionLease(connection: mockConnection) { + + } + continuation.resume(with: .success(lease)) } - XCTAssert(connection === mockConnection) + XCTAssert(lease.connection === mockConnection) } func testSadPath() async throws { From 626a47acd5ce55732dd55fd2964b9a9a95b5f0e1 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 14 May 2025 17:33:09 +0200 Subject: [PATCH 3/3] Playing around with Perf --- Benchmarks/Package.swift | 12 +- .../Sources/PostgresPerf/PostgresPerf.swift | 131 ++++++++++++++++++ .../ConnectionPoolModule/ConnectionPool.swift | 1 - Sources/PostgresNIO/Pool/PostgresClient.swift | 27 +++- docker-compose.yml | 1 + 5 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 Benchmarks/Sources/PostgresPerf/PostgresPerf.swift diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 1ccd46cc..01208968 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -10,7 +10,8 @@ let package = Package( dependencies: [ .package(path: "../"), .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.82.0"), + .package(url: "https://github.com/vapor/postgres-kit.git", from: "2.14.0"), + .package(name: "swift-nio", path: "../../../apple/swift-nio"), ], targets: [ .executableTarget( @@ -27,5 +28,14 @@ let package = Package( .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] ), + .executableTarget( + name: "PostgresPerf", + dependencies: [ + .product(name: "PostgresNIO", package: "postgres-nio"), + .product(name: "PostgresKit", package: "postgres-kit"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), + ], + ) ] ) diff --git a/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift b/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift new file mode 100644 index 00000000..b535435f --- /dev/null +++ b/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift @@ -0,0 +1,131 @@ +// +// PostgresPerf.swift +// benchmarks +// +// Created by Fabian Fett on 12.05.25. +// + +import Synchronization +import PostgresNIO +@preconcurrency import PostgresKit +@preconcurrency import AsyncKit + +@main +@available(macOS 15.0, *) +enum PostgresPerf { + + static let maxConnections: Int = 400 + static let tasks: Int = 400 + static let iterationsPerTask: Int = 1000 + static let logger = Logger(label: "TestLogger") + static let clock = ContinuousClock() + + static let eventLoopCount = { + NIOSingletons.posixEventLoopGroup.makeIterator().reduce(0, { (res, _) in res + 1 }) + }() + + static func main() async throws { +// if CommandLine.arguments.first == "kit" { + try await Self.runPostgresKit() +// } else { + try await self.runPostgresNIO() +// } + } + + static func runPostgresKit() async throws { + let configuration = SQLPostgresConfiguration( + hostname: "localhost", port: 5432, + username: "test_username", + password: "test_password", + database: "test_database", + tls: .disable + ) + + let pools = EventLoopGroupConnectionPool( + source: PostgresConnectionSource(sqlConfiguration: configuration), + maxConnectionsPerEventLoop: Self.maxConnections / Self.eventLoopCount, + on: NIOSingletons.posixEventLoopGroup + ) + + let start = self.clock.now + await withThrowingTaskGroup(of: Void.self) { taskGroup in + for _ in 0.. String? { + getenv(name).flatMap { String(cString: $0) } +} diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 0cd844f4..23ea3329 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -524,7 +524,6 @@ public final class ConnectionPool< @inlinable /*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) { - print("timer: \(timer.connectionID), underlying: \(timer.underlying.usecase)") self.addTask(into: &taskGroup) { () async -> () in await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in taskGroup.addTask { diff --git a/Sources/PostgresNIO/Pool/PostgresClient.swift b/Sources/PostgresNIO/Pool/PostgresClient.swift index a6d313c0..45323977 100644 --- a/Sources/PostgresNIO/Pool/PostgresClient.swift +++ b/Sources/PostgresNIO/Pool/PostgresClient.swift @@ -233,6 +233,19 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { } } + typealias PoolManager = ConnectionPoolManager< + PostgresConnection, + PostgresConnection.ID, + ConnectionIDGenerator, + Foo, + ConnectionRequest, + ConnectionRequest.ID, + PostgresKeepAliveBehavor, + NothingConnectionPoolExecutor, + PostgresClientMetrics, + ContinuousClock + > + typealias Pool = ConnectionPool< PostgresConnection, PostgresConnection.ID, @@ -246,7 +259,7 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { ContinuousClock > - let pool: Pool + let pool: PoolManager let factory: ConnectionFactory let runningAtomic = ManagedAtomic(false) let backgroundLogger: Logger @@ -282,13 +295,19 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { self.factory = factory self.backgroundLogger = backgroundLogger - self.pool = ConnectionPool( - configuration: .init(configuration), + let executors = (0..<10).map { _ in NothingConnectionPoolExecutor() } + var poolManagerConfiguration = ConnectionPoolManagerConfiguration() + poolManagerConfiguration.minimumConnectionPerExecutorCount = configuration.options.minimumConnections / executors.count + poolManagerConfiguration.maximumConnectionPerExecutorSoftLimit = configuration.options.maximumConnections / executors.count + poolManagerConfiguration.maximumConnectionPerExecutorHardLimit = configuration.options.maximumConnections / executors.count + + self.pool = ConnectionPoolManager( + configuration: poolManagerConfiguration, connectionConfiguration: Foo(), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger), - executor: NothingConnectionPoolExecutor(), + executors: executors, observabilityDelegate: .init(logger: backgroundLogger), clock: ContinuousClock() ) { (connectionID, connectionConfiguration, pool) in diff --git a/docker-compose.yml b/docker-compose.yml index 3eff4249..ee66a12d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,7 @@ version: '3.7' x-shared-config: &shared_config + command: -c 'max_connections=500' environment: POSTGRES_HOST_AUTH_METHOD: "${POSTGRES_HOST_AUTH_METHOD:-scram-sha-256}" POSTGRES_USER: test_username