Skip to content

Commit 661de8c

Browse files
committed
Replace hand written future impls
This commit replaces the hand written future impls with such ones that wrap the types from futures-util. This hopefully reduces the complexity and also allows using the well tested types instead of hand written ones. I also removed the dependency on pin-project-light in favour of two lines of unsafe code as pin-project would do the same thing internally as well and these two instances are trivial to reason about.
1 parent a641385 commit 661de8c

File tree

4 files changed

+60
-79
lines changed

4 files changed

+60
-79
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ deadpool = { version = "0.12", optional = true, default-features = false, featur
3636
] }
3737
mobc = { version = ">=0.7,<0.10", optional = true }
3838
scoped-futures = { version = "0.1", features = ["std"] }
39-
pin-project-lite = "0.2.16"
4039

4140
[dependencies.diesel]
4241
version = "~2.3.0"

src/pg/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,22 +1175,22 @@ mod tests {
11751175

11761176
async fn fn37(
11771177
mut conn: &AsyncPgConnection,
1178-
) -> QueryResult<(usize, Vec<i32>, i32, Vec<i32>, i32)> {
1178+
) -> QueryResult<(usize, (Vec<i32>, (i32, (Vec<i32>, i32))))> {
11791179
let f3 = diesel::select(0_i32.into_sql::<Integer>()).execute(&mut conn);
11801180
let f4 = diesel::select(4_i32.into_sql::<Integer>()).load::<i32>(&mut conn);
11811181
let f5 = diesel::select(5_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
11821182
let f6 = diesel::select(6_i32.into_sql::<Integer>()).get_results::<i32>(&mut conn);
11831183
let f7 = diesel::select(7_i32.into_sql::<Integer>()).first::<i32>(&mut conn);
11841184

1185-
try_join!(f3, f4, f5, f6, f7)
1185+
try_join(f3, try_join(f4, try_join(f5, try_join(f6, f7)))).await
11861186
}
11871187

11881188
conn.transaction(|conn| {
11891189
async move {
11901190
let f12 = fn12(conn);
11911191
let f37 = fn37(conn);
11921192

1193-
let ((r1, r2), (r3, r4, r5, r6, r7)) = try_join!(f12, f37).unwrap();
1193+
let ((r1, r2), (r3, (r4, (r5, (r6, r7))))) = try_join(f12, f37).await.unwrap();
11941194

11951195
assert_eq!(r1, 1);
11961196
assert_eq!(r2, 2);

src/run_query_dsl/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub mod methods {
9999
ST: 'static,
100100
{
101101
type LoadFuture<'conn>
102-
= utils::Map<Conn::LoadFuture<'conn, 'query>, Self::Stream<'conn>>
102+
= utils::MapOk<Conn::LoadFuture<'conn, 'query>, Self::Stream<'conn>>
103103
where
104104
Conn: 'conn;
105105

@@ -112,10 +112,10 @@ pub mod methods {
112112
Conn: 'conn;
113113

114114
fn internal_load(self, conn: &mut Conn) -> Self::LoadFuture<'_> {
115-
utils::Map::new(conn.load(self), |stream| {
116-
Ok(stream?.map(|row| {
115+
utils::MapOk::new(conn.load(self), |stream| {
116+
stream.map(|row| {
117117
U::build_from_row(&row?).map_err(diesel::result::Error::DeserializationError)
118-
}))
118+
})
119119
})
120120
}
121121
}
@@ -313,7 +313,7 @@ pub trait RunQueryDsl<Conn>: Sized {
313313
Conn: AsyncConnectionCore,
314314
Self: methods::LoadQuery<'query, Conn, U> + 'query,
315315
{
316-
utils::AndThen::new(self.internal_load(conn), |stream| Ok(stream?.try_collect()))
316+
utils::AndThen::new(self.internal_load(conn), |stream| stream.try_collect())
317317
}
318318

319319
/// Executes the given query, returning a [`Stream`] with the returned rows.
@@ -509,7 +509,7 @@ pub trait RunQueryDsl<Conn>: Sized {
509509
Self: methods::LoadQuery<'query, Conn, U> + 'query,
510510
{
511511
utils::AndThen::new(self.internal_load(conn), |stream| {
512-
Ok(utils::LoadNext::new(Box::pin(stream?)))
512+
utils::LoadNext::new(Box::pin(stream))
513513
})
514514
}
515515

src/run_query_dsl/utils.rs

Lines changed: 51 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,95 +4,77 @@ use std::task::{Context, Poll};
44

55
use diesel::QueryResult;
66
use futures_core::{ready, TryFuture, TryStream};
7-
use futures_util::TryStreamExt;
8-
use pin_project_lite::pin_project;
7+
use futures_util::{TryFutureExt, TryStreamExt};
98

10-
pin_project! {
11-
/// Reimplementation of [`futures_util::future::Map`] without the generic closure argument
12-
#[project = MapProj]
13-
#[project_replace = MapProjReplace]
14-
pub enum Map<Fut: Future, T> {
15-
Incomplete {
16-
#[pin]
17-
future: Fut,
18-
f: fn(Fut::Output) -> QueryResult<T>,
19-
},
20-
Complete,
21-
}
9+
// We use a custom future implementation here to erase some lifetimes
10+
// that otherwise need to be specified explicitly
11+
//
12+
// Specifying these lifetimes results in the compiler not beeing
13+
// able to look through the generic code and emit
14+
// lifetime erros for pipelined queries. See
15+
// https://github.com/weiznich/diesel_async/issues/249 for more context
16+
#[repr(transparent)]
17+
pub struct MapOk<F: TryFutureExt, T> {
18+
future: futures_util::future::MapOk<F, fn(F::Ok) -> T>,
2219
}
2320

24-
impl<Fut: Future, T> Map<Fut, T> {
25-
pub(crate) fn new(future: Fut, f: fn(Fut::Output) -> QueryResult<T>) -> Self {
26-
Self::Incomplete { future, f }
21+
impl<F, T> Future for MapOk<F, T>
22+
where
23+
F: TryFuture,
24+
futures_util::future::MapOk<F, fn(F::Ok) -> T>: Future<Output = Result<T, F::Error>>,
25+
{
26+
type Output = Result<T, F::Error>;
27+
28+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
29+
unsafe {
30+
// SAFETY: This projects pinning to the only inner field, so it
31+
// should be safe
32+
self.map_unchecked_mut(|s| &mut s.future)
33+
}
34+
.poll(cx)
2735
}
2836
}
2937

30-
impl<Fut: Future, T> Future for Map<Fut, T> {
31-
type Output = QueryResult<T>;
32-
33-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<QueryResult<T>> {
34-
match self.as_mut().project() {
35-
MapProj::Incomplete { future, .. } => {
36-
let output = ready!(future.poll(cx));
37-
match self.as_mut().project_replace(Map::Complete) {
38-
MapProjReplace::Incomplete { f, .. } => Poll::Ready(f(output)),
39-
MapProjReplace::Complete => unreachable!(),
40-
}
41-
}
42-
MapProj::Complete => panic!("Map polled after completion"),
38+
impl<Fut: TryFutureExt, T> MapOk<Fut, T> {
39+
pub(crate) fn new(future: Fut, f: fn(Fut::Ok) -> T) -> Self {
40+
Self {
41+
future: future.map_ok(f),
4342
}
4443
}
4544
}
4645

47-
pin_project! {
48-
/// Reimplementation of [`futures_util::future::AndThen`] without the generic closure argument
49-
#[project = AndThenProj]
50-
pub enum AndThen<Fut1: Future, Fut2> {
51-
First {
52-
#[pin]
53-
future1: Map<Fut1, Fut2>,
54-
},
55-
Second {
56-
#[pin]
57-
future2: Fut2,
58-
},
59-
Empty,
60-
}
46+
// similar to `MapOk` above this mainly exists to hide the lifetime
47+
#[repr(transparent)]
48+
pub struct AndThen<F1: TryFuture, F2> {
49+
future: futures_util::future::AndThen<F1, F2, fn(F1::Ok) -> F2>,
6150
}
6251

63-
impl<Fut1: Future, Fut2> AndThen<Fut1, Fut2> {
64-
pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Output) -> QueryResult<Fut2>) -> AndThen<Fut1, Fut2> {
65-
Self::First {
66-
future1: Map::new(fut1, f),
52+
impl<Fut1, Fut2> AndThen<Fut1, Fut2>
53+
where
54+
Fut1: TryFuture,
55+
Fut2: TryFuture<Error = Fut1::Error>,
56+
{
57+
pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Ok) -> Fut2) -> AndThen<Fut1, Fut2> {
58+
Self {
59+
future: fut1.and_then(f),
6760
}
6861
}
6962
}
7063

71-
impl<Fut1, Fut2> Future for AndThen<Fut1, Fut2>
64+
impl<F1, F2> Future for AndThen<F1, F2>
7265
where
73-
Fut1: TryFuture<Error = diesel::result::Error>,
74-
Fut2: TryFuture<Error = diesel::result::Error>,
66+
F1: TryFuture,
67+
F2: TryFuture<Error = F1::Error>,
7568
{
76-
type Output = QueryResult<Fut2::Ok>;
69+
type Output = Result<F2::Ok, F2::Error>;
7770

78-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79-
loop {
80-
match self.as_mut().project() {
81-
AndThenProj::First { future1 } => match ready!(future1.try_poll(cx)) {
82-
Ok(future2) => self.set(Self::Second { future2 }),
83-
Err(error) => {
84-
self.set(Self::Empty);
85-
break Poll::Ready(Err(error));
86-
}
87-
},
88-
AndThenProj::Second { future2 } => {
89-
let output = ready!(future2.try_poll(cx));
90-
self.set(Self::Empty);
91-
break Poll::Ready(output);
92-
}
93-
AndThenProj::Empty => panic!("AndThen polled after completion"),
94-
}
71+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72+
unsafe {
73+
// SAFETY: This projects pinning to the only inner field, so it
74+
// should be safe
75+
self.map_unchecked_mut(|s| &mut s.future)
9576
}
77+
.poll(cx)
9678
}
9779
}
9880

0 commit comments

Comments
 (0)