Skip to content

Commit ac3d382

Browse files
committed
WIP refactor: replace sharding with single connection set (5)
1 parent 85de57c commit ac3d382

File tree

6 files changed

+44
-19
lines changed

6 files changed

+44
-19
lines changed

sqlx-core/src/pool/connect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ async fn connect_with_backoff<DB: Database>(
663663
})));
664664
}
665665
ControlFlow::Break(Err(e)) => {
666-
tracing::warn!(
666+
tracing::error!(
667667
target: "sqlx::pool::connect",
668668
%connection_id,
669669
attempt,

sqlx-core/src/pool/connection_set.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ impl<C> ConnectionSet<C> {
127127

128128
let preferred_slot = current_thread_id() % self.slots.len();
129129

130+
tracing::trace!(preferred_slot, ?pref, "acquire_inner");
131+
130132
// Always try to lock the connection associated with our thread ID
131133
let mut acquire_preferred = pin!(self.slots[preferred_slot].acquire(pref));
132134

@@ -174,9 +176,8 @@ impl<C> ConnectionSet<C> {
174176
}
175177

176178
listen_global.as_mut().set(self.global.listen(pref));
179+
continue;
177180
}
178-
179-
continue;
180181
}
181182

182183
return Poll::Pending;
@@ -263,15 +264,20 @@ impl<C> ConnectionSet<C> {
263264

264265
impl AcquirePreference {
265266
#[inline(always)]
266-
fn wants_connected(&self) -> bool {
267-
matches!(self, Self::Connected | Self::Either)
267+
fn wants_connected(&self, is_connected: bool) -> bool {
268+
match (self, is_connected) {
269+
(Self::Connected, true) => true,
270+
(Self::Disconnected, false) => true,
271+
(Self::Either, _) => true,
272+
_ => false,
273+
}
268274
}
269275
}
270276

271277
impl<C> Slot<C> {
272278
#[inline(always)]
273279
fn matches_pref(&self, pref: AcquirePreference) -> bool {
274-
!self.is_leaked() && self.is_connected() == pref.wants_connected()
280+
!self.is_leaked() && pref.wants_connected(self.is_connected())
275281
}
276282

277283
#[inline(always)]
@@ -308,21 +314,30 @@ impl<C> Slot<C> {
308314
async fn acquire(self: &Arc<Self>, pref: AcquirePreference) -> SlotGuard<C> {
309315
loop {
310316
if self.matches_pref(pref) {
317+
tracing::trace!(slot_index=%self.index, "waiting for lock");
318+
311319
let locked = self.lock().await;
312320

313321
if locked.matches_pref(pref) {
314322
return locked;
315323
}
316324
}
317325

318-
let event = if pref.wants_connected() {
319-
&self.unlock_event
320-
} else {
321-
&self.disconnect_event
322-
};
323-
324-
listener!(event => listener);
325-
listener.await;
326+
match pref {
327+
AcquirePreference::Connected => {
328+
listener!(self.unlock_event => listener);
329+
listener.await;
330+
}
331+
AcquirePreference::Disconnected => {
332+
listener!(self.disconnect_event => listener);
333+
listener.await
334+
}
335+
AcquirePreference::Either => {
336+
listener!(self.unlock_event => unlock_listener);
337+
listener!(self.disconnect_event => disconnect_listener);
338+
race(unlock_listener, disconnect_listener).await.ok();
339+
}
340+
}
326341
}
327342
}
328343

@@ -374,7 +389,7 @@ impl<C> SlotGuard<C> {
374389

375390
#[inline(always)]
376391
fn matches_pref(&self, pref: AcquirePreference) -> bool {
377-
!self.slot.is_leaked() && self.is_connected() == pref.wants_connected()
392+
!self.slot.is_leaked() && pref.wants_connected(self.is_connected())
378393
}
379394

380395
#[inline(always)]

sqlx-core/src/pool/inner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ impl<DB: Database> PoolInner<DB> {
181181
self: &Arc<Self>,
182182
connect_shared: &mut Option<Arc<ConnectTaskShared>>,
183183
) -> Result<PoolConnection<DB>, Error> {
184+
tracing::trace!("waiting for any connection");
185+
184186
let disconnected = match self.connections.acquire_any().await {
185187
Ok(conn) => match finish_acquire(self, conn).await {
186188
Ok(conn) => return Ok(conn),

sqlx-core/src/pool/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use crate::sql_str::SqlSafeStr;
6868
use crate::transaction::Transaction;
6969
use event_listener::EventListener;
7070
use futures_core::FusedFuture;
71-
71+
use tracing::Instrument;
7272
pub use self::connect::{PoolConnectMetadata, PoolConnector};
7373
pub use self::connection::PoolConnection;
7474
use self::inner::PoolInner;
@@ -363,6 +363,7 @@ impl<DB: Database> Pool<DB> {
363363
pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static {
364364
let shared = self.0.clone();
365365
async move { shared.acquire().await }
366+
.instrument(tracing::error_span!(target: "sqlx::pool", "acquire"))
366367
}
367368

368369
/// Attempts to retrieve a connection from the pool if there is one available.

sqlx-test/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ use sqlx::pool::PoolOptions;
22
use sqlx::{Connection, Database, Error, Pool};
33
use std::env;
44
use tracing_subscriber::EnvFilter;
5+
use tracing_subscriber::fmt::format::FmtSpan;
56

67
pub fn setup_if_needed() {
78
let _ = dotenvy::dotenv();
89
let _ = tracing_subscriber::fmt::Subscriber::builder()
910
.with_env_filter(EnvFilter::from_default_env())
10-
.with_test_writer()
11-
.finish();
11+
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
12+
// .with_test_writer()
13+
.try_init();
1214
}
1315

1416
// Make a new connection

tests/postgres/postgres.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ async fn it_works_with_cache_disabled() -> anyhow::Result<()> {
255255

256256
#[sqlx_macros::test]
257257
async fn it_executes_with_pool() -> anyhow::Result<()> {
258+
setup_if_needed();
259+
260+
tracing::info!("starting test");
261+
258262
let pool = sqlx_test::pool::<Postgres>().await?;
259263

260264
let rows = pool.fetch_all("SELECT 1; SElECT 2").await?;
@@ -1146,7 +1150,7 @@ async fn test_listener_try_recv_buffered() -> anyhow::Result<()> {
11461150
assert!(listener.next_buffered().is_none());
11471151

11481152
// Activate connection.
1149-
sqlx::query!("SELECT 1 AS one")
1153+
sqlx::query("SELECT 1 AS one")
11501154
.fetch_all(&mut listener)
11511155
.await?;
11521156

@@ -2086,6 +2090,7 @@ async fn test_issue_3052() {
20862090
}
20872091

20882092
#[sqlx_macros::test]
2093+
#[cfg(feature = "chrono")]
20892094
async fn test_bind_iter() -> anyhow::Result<()> {
20902095
use sqlx::postgres::PgBindIterExt;
20912096
use sqlx::types::chrono::{DateTime, Utc};

0 commit comments

Comments
 (0)