Skip to content

Started work on a pool manager #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import _ConnectionPoolModule
import _ConnectionPoolTestUtils
import Benchmark
import NIOCore
import NIOPosix

let benchmarks: @Sendable () -> Void = {
Benchmark("Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in
Benchmark("Pool: Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>(autoMaxStreams: 1)
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
var configuration = ConnectionPoolConfiguration()
configuration.maximumConnectionSoftLimit = 50
configuration.maximumConnectionHardLimit = 50

let pool = ConnectionPool(
configuration: configuration,
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
idGenerator: ConnectionIDGenerator(),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
executor: MockExecutor(),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
}

await withTaskGroup { taskGroup in
Expand Down Expand Up @@ -54,21 +58,23 @@ let benchmarks: @Sendable () -> Void = {
}
}

Benchmark("Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in
Benchmark("Pool: Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>(autoMaxStreams: 1)
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
var configuration = ConnectionPoolConfiguration()
configuration.maximumConnectionSoftLimit = 50
configuration.maximumConnectionHardLimit = 50

let pool = ConnectionPool(
configuration: configuration,
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
idGenerator: ConnectionIDGenerator(),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
executor: MockExecutor(),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
}

await withTaskGroup { taskGroup in
Expand Down Expand Up @@ -96,4 +102,123 @@ let benchmarks: @Sendable () -> Void = {
taskGroup.cancelAll()
}
}

Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 50 parallel – 10 MockExecutors", configuration: .init(scalingFactor: .kilo)) { benchmark in
let clock = MockClock()
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
var configuration = ConnectionPoolManagerConfiguration()
let executorCount = 10
let executors = (0..<executorCount).map { _ in MockExecutor() }

let concurrency = 50

configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executorCount
configuration.maximumConnectionPerExecutorHardLimit = concurrency / executorCount

let pool = ConnectionPoolManager(
configuration: configuration,
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
executors: executors,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
}

await withTaskGroup { taskGroup in
taskGroup.addTask {
await pool.run()
}

let sequential = benchmark.scaledIterations.upperBound / concurrency

benchmark.startMeasurement()

for parallel in 0..<concurrency {
taskGroup.addTask {
for _ in 0..<sequential {
do {
try await pool.withConnection { connection in
blackHole(connection)
}
} catch {
fatalError()
}
}
}
}

for i in 0..<concurrency {
await taskGroup.next()
}

benchmark.stopMeasurement()

taskGroup.cancelAll()
}
}

if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) {
let eventLoops = NIOSingletons.posixEventLoopGroup
let count = eventLoops.makeIterator().reduce(into: 0, { (result, _) in result += 1 })
Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 10 parallel – \(count) NIO executors", configuration: .init(scalingFactor: .kilo)) { benchmark in
let clock = MockClock()
let factory = MockConnectionFactory<MockClock, NIOTaskExecutor>(autoMaxStreams: 1)
var configuration = ConnectionPoolManagerConfiguration()
try await NIOTaskExecutor.withExecutors(eventLoops) { executors in
let concurrency = 50

configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executors.count
configuration.maximumConnectionPerExecutorHardLimit = concurrency / executors.count

let pool = ConnectionPoolManager(
configuration: configuration,
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
executors: executors,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<NIOTaskExecutor>.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
}

await withTaskGroup { taskGroup in
taskGroup.addTask {
await pool.run()
}

let sequential = benchmark.scaledIterations.upperBound / executors.count

benchmark.startMeasurement()

for executor in executors {
taskGroup.addTask(executorPreference: executor) {
for _ in 0..<sequential {
do {
try await pool.withConnection { connection in
blackHole(connection)
}
} catch {
fatalError()
}
}
}
}

for _ in executors {
await taskGroup.next()
}

benchmark.stopMeasurement()

taskGroup.cancelAll()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// NIOTaskExecutor.swift
// benchmarks
//
// Created by Fabian Fett on 09.05.25.
//

import NIOCore
import NIOPosix
import _ConnectionPoolModule

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
final class NIOTaskExecutor {

private static let threadSpecificEventLoop = ThreadSpecificVariable<NIOTaskExecutor>()

let eventLoop: any EventLoop

private init(eventLoop: any EventLoop) {
self.eventLoop = eventLoop
}

static func withExecutors(_ eventLoops: MultiThreadedEventLoopGroup, _ body: ([NIOTaskExecutor]) async throws -> ()) async throws {
var executors = [NIOTaskExecutor]()
for eventLoop in eventLoops.makeIterator() {
let executor = NIOTaskExecutor(eventLoop: eventLoop)
try await eventLoop.submit {
NIOTaskExecutor.threadSpecificEventLoop.currentValue = executor
}.get()
executors.append(executor)
}
do {
try await body(executors)
} catch {

}
for eventLoop in eventLoops.makeIterator() {
try await eventLoop.submit {
NIOTaskExecutor.threadSpecificEventLoop.currentValue = nil
}.get()
}
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIOTaskExecutor: TaskExecutor {

func enqueue(_ job: consuming ExecutorJob) {
// By default we are just going to use execute to run the job
// this is quite heavy since it allocates the closure for
// every single job.
let unownedJob = UnownedJob(job)
self.eventLoop.execute {
unownedJob.runSynchronously(on: self.asUnownedTaskExecutor())
}
}

func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIOTaskExecutor: ConnectionPoolExecutor {
typealias ID = ObjectIdentifier

var id: ObjectIdentifier {
ObjectIdentifier(self)
}

static func getExecutorID() -> ObjectIdentifier? {
self.threadSpecificEventLoop.currentValue?.id
}
}
13 changes: 13 additions & 0 deletions Benchmarks/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ let package = Package(
dependencies: [
.package(path: "../"),
.package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"),
.package(url: "https://github.com/vapor/postgres-kit.git", from: "2.14.0"),
.package(name: "swift-nio", path: "../../../apple/swift-nio"),
],
targets: [
.executableTarget(
Expand All @@ -18,11 +20,22 @@ let package = Package(
.product(name: "_ConnectionPoolModule", package: "postgres-nio"),
.product(name: "_ConnectionPoolTestUtils", package: "postgres-nio"),
.product(name: "Benchmark", package: "package-benchmark"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
],
path: "Benchmarks/ConnectionPoolBenchmarks",
plugins: [
.plugin(name: "BenchmarkPlugin", package: "package-benchmark")
]
),
.executableTarget(
name: "PostgresPerf",
dependencies: [
.product(name: "PostgresNIO", package: "postgres-nio"),
.product(name: "PostgresKit", package: "postgres-kit"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
],
)
]
)
Loading
Loading