Skip to content

Commit 0dd92b4

Browse files
committed
WIP refactor: replace sharding with single connection set
1 parent d905016 commit 0dd92b4

File tree

13 files changed

+1183
-346
lines changed

13 files changed

+1183
-346
lines changed

sqlx-core/src/ext/future.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use pin_project_lite::pin_project;
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
pin_project! {
7+
#[project = RaceProject]
8+
pub struct Race<L, R> {
9+
#[pin]
10+
left: L,
11+
#[pin]
12+
right: R,
13+
}
14+
}
15+
16+
impl<L, R> Future for Race<L, R>
17+
where
18+
L: Future,
19+
R: Future,
20+
{
21+
type Output = Result<L::Output, R::Output>;
22+
23+
#[inline(always)]
24+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
25+
let mut this = self.project();
26+
27+
if let Poll::Ready(left) = this.left.as_mut().poll(cx) {
28+
return Poll::Ready(Ok(left));
29+
}
30+
31+
this.right.as_mut().poll(cx).map(Err)
32+
}
33+
}
34+
35+
#[inline(always)]
36+
pub fn race<L, R>(left: L, right: R) -> Race<L, R> {
37+
Race { left, right }
38+
}

sqlx-core/src/ext/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@ pub mod ustr;
22

33
#[macro_use]
44
pub mod async_stream;
5+
6+
pub mod future;

sqlx-core/src/pool/connect.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
1414
use std::sync::{Arc, Mutex, RwLock};
1515
use std::time::Instant;
1616

17-
use crate::pool::shard::DisconnectedSlot;
17+
use crate::pool::connection_set::DisconnectedSlot;
1818
#[cfg(doc)]
1919
use crate::pool::PoolOptions;
2020
use crate::sync::{AsyncMutex, AsyncMutexGuard};
@@ -646,26 +646,24 @@ async fn connect_with_backoff<DB: Database>(
646646

647647
match res {
648648
ControlFlow::Break(Ok(conn)) => {
649-
tracing::trace!(
649+
tracing::debug!(
650650
target: "sqlx::pool::connect",
651651
%connection_id,
652652
attempt,
653653
elapsed_seconds,
654654
"connection established",
655655
);
656656

657-
return Ok(PoolConnection::new(
658-
slot.put(ConnectionInner {
659-
raw: conn,
660-
id: connection_id,
661-
created_at: now,
662-
last_released_at: now,
663-
}),
664-
pool.0.clone(),
665-
));
657+
return Ok(PoolConnection::new(slot.put(ConnectionInner {
658+
pool: Arc::downgrade(&pool.0),
659+
raw: conn,
660+
id: connection_id,
661+
created_at: now,
662+
last_released_at: now,
663+
})));
666664
}
667665
ControlFlow::Break(Err(e)) => {
668-
tracing::warn!(
666+
tracing::error!(
669667
target: "sqlx::pool::connect",
670668
%connection_id,
671669
attempt,

sqlx-core/src/pool/connection.rs

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
11
use std::fmt::{self, Debug, Formatter};
22
use std::future::{self, Future};
3+
use std::io;
34
use std::ops::{Deref, DerefMut};
4-
use std::sync::Arc;
5+
use std::sync::{Arc, Weak};
56
use std::time::{Duration, Instant};
67

78
use crate::connection::Connection;
89
use crate::database::Database;
910
use crate::error::Error;
1011

11-
use super::inner::{is_beyond_max_lifetime, PoolInner};
12+
use super::inner::PoolInner;
1213
use crate::pool::connect::{ConnectPermit, ConnectTaskShared, ConnectionId};
14+
use crate::pool::connection_set::{ConnectedSlot, DisconnectedSlot};
1315
use crate::pool::options::PoolConnectionMetadata;
14-
use crate::pool::shard::{ConnectedSlot, DisconnectedSlot};
15-
use crate::pool::Pool;
16+
use crate::pool::{Pool, PoolOptions};
1617
use crate::rt;
1718

1819
const RETURN_TO_POOL_TIMEOUT: Duration = Duration::from_secs(5);
19-
const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
20+
const CLOSE_TIMEOUT: Duration = Duration::from_secs(5);
2021

2122
/// A connection managed by a [`Pool`][crate::pool::Pool].
2223
///
2324
/// Will be returned to the pool on-drop.
2425
pub struct PoolConnection<DB: Database> {
2526
conn: Option<ConnectedSlot<ConnectionInner<DB>>>,
26-
pub(crate) pool: Arc<PoolInner<DB>>,
2727
close_on_drop: bool,
2828
}
2929

3030
pub(super) struct ConnectionInner<DB: Database> {
31+
// Note: must be `Weak` to prevent a reference cycle
32+
pub(crate) pool: Weak<PoolInner<DB>>,
3133
pub(super) raw: DB::Connection,
3234
pub(super) id: ConnectionId,
3335
pub(super) created_at: Instant,
@@ -72,11 +74,10 @@ impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
7274
}
7375

7476
impl<DB: Database> PoolConnection<DB> {
75-
pub(super) fn new(live: ConnectedSlot<ConnectionInner<DB>>, pool: Arc<PoolInner<DB>>) -> Self {
77+
pub(super) fn new(live: ConnectedSlot<ConnectionInner<DB>>) -> Self {
7678
Self {
7779
conn: Some(live),
7880
close_on_drop: false,
79-
pool,
8081
}
8182
}
8283

@@ -140,13 +141,16 @@ impl<DB: Database> PoolConnection<DB> {
140141
#[doc(hidden)]
141142
pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
142143
let conn = self.conn.take();
143-
let pool = self.pool.clone();
144144

145145
async move {
146146
let Some(conn) = conn else {
147147
return;
148148
};
149149

150+
let Some(pool) = Weak::upgrade(&conn.pool) else {
151+
return;
152+
};
153+
150154
rt::timeout(RETURN_TO_POOL_TIMEOUT, return_to_pool(conn, &pool))
151155
.await
152156
// Dropping of the `slot` will check if the connection must be re-established
@@ -161,7 +165,7 @@ impl<DB: Database> PoolConnection<DB> {
161165
async move {
162166
if let Some(conn) = conn {
163167
// Don't hold the connection forever if it hangs while trying to close
164-
rt::timeout(CLOSE_ON_DROP_TIMEOUT, close(conn)).await.ok();
168+
rt::timeout(CLOSE_TIMEOUT, close(conn)).await.ok();
165169
}
166170
}
167171
}
@@ -195,7 +199,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
195199
}
196200

197201
// We still need to spawn a task to maintain `min_connections`.
198-
if self.conn.is_some() || self.pool.options.min_connections > 0 {
202+
if self.conn.is_some() {
199203
crate::rt::spawn(self.return_to_pool());
200204
}
201205
}
@@ -220,6 +224,48 @@ impl<DB: Database> ConnectionInner<DB> {
220224
idle_for: now.saturating_duration_since(self.last_released_at),
221225
}
222226
}
227+
228+
pub fn is_beyond_max_lifetime(&self, options: &PoolOptions<DB>) -> bool {
229+
if let Some(max_lifetime) = options.max_lifetime {
230+
let age = self.created_at.elapsed();
231+
232+
if age > max_lifetime {
233+
tracing::info!(
234+
target: "sqlx::pool",
235+
connection_id=%self.id,
236+
?age,
237+
"connection is beyond `max_lifetime`, closing"
238+
);
239+
240+
return true;
241+
}
242+
}
243+
244+
false
245+
}
246+
247+
pub fn is_beyond_idle_timeout(&self, options: &PoolOptions<DB>) -> bool {
248+
if let Some(idle_timeout) = options.idle_timeout {
249+
let now = Instant::now();
250+
251+
let age = now.duration_since(self.created_at);
252+
let idle_duration = now.duration_since(self.last_released_at);
253+
254+
if idle_duration > idle_timeout {
255+
tracing::info!(
256+
target: "sqlx::pool",
257+
connection_id=%self.id,
258+
?age,
259+
?idle_duration,
260+
"connection is beyond `idle_timeout`, closing"
261+
);
262+
263+
return true;
264+
}
265+
}
266+
267+
false
268+
}
223269
}
224270

225271
pub(crate) async fn close<DB: Database>(
@@ -231,14 +277,19 @@ pub(crate) async fn close<DB: Database>(
231277

232278
let (conn, slot) = ConnectedSlot::take(conn);
233279

234-
let res = conn.raw.close().await.inspect_err(|error| {
235-
tracing::debug!(
236-
target: "sqlx::pool",
237-
%connection_id,
238-
%error,
239-
"error occurred while closing the pool connection"
240-
);
241-
});
280+
let res = rt::timeout(CLOSE_TIMEOUT, conn.raw.close())
281+
.await
282+
.unwrap_or_else(|_| {
283+
Err(io::Error::new(io::ErrorKind::TimedOut, "timed out sending close packet").into())
284+
})
285+
.inspect_err(|error| {
286+
tracing::debug!(
287+
target: "sqlx::pool",
288+
%connection_id,
289+
%error,
290+
"error occurred while closing the pool connection"
291+
);
292+
});
242293

243294
(res, slot)
244295
}
@@ -255,14 +306,19 @@ pub(crate) async fn close_hard<DB: Database>(
255306

256307
let (conn, slot) = ConnectedSlot::take(conn);
257308

258-
let res = conn.raw.close_hard().await.inspect_err(|error| {
259-
tracing::debug!(
260-
target: "sqlx::pool",
261-
%connection_id,
262-
%error,
263-
"error occurred while closing the pool connection"
264-
);
265-
});
309+
let res = rt::timeout(CLOSE_TIMEOUT, conn.raw.close_hard())
310+
.await
311+
.unwrap_or_else(|_| {
312+
Err(io::Error::new(io::ErrorKind::TimedOut, "timed out sending close packet").into())
313+
})
314+
.inspect_err(|error| {
315+
tracing::debug!(
316+
target: "sqlx::pool",
317+
%connection_id,
318+
%error,
319+
"error occurred while closing the pool connection"
320+
);
321+
});
266322

267323
(res, slot)
268324
}
@@ -282,7 +338,7 @@ async fn return_to_pool<DB: Database>(
282338

283339
// If the connection is beyond max lifetime, close the connection and
284340
// immediately create a new connection
285-
if is_beyond_max_lifetime(&conn, &pool.options) {
341+
if conn.is_beyond_max_lifetime(&pool.options) {
286342
let (_res, slot) = close(conn).await;
287343
return Err(slot);
288344
}
@@ -314,6 +370,7 @@ async fn return_to_pool<DB: Database>(
314370
// to recover from cancellations
315371
if let Err(error) = conn.raw.ping().await {
316372
tracing::warn!(
373+
target: "sqlx::pool",
317374
%error,
318375
"error occurred while testing the connection on-release",
319376
);

0 commit comments

Comments
 (0)