Skip to content

Commit 80233b7

Browse files
Use custom Future combinators to avoid GAT errors
1 parent ba8ad28 commit 80233b7

File tree

4 files changed

+173
-104
lines changed

4 files changed

+173
-104
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ deadpool = { version = "0.12", optional = true, default-features = false, featur
3636
] }
3737
mobc = { version = ">=0.7,<0.10", optional = true }
3838
scoped-futures = { version = "0.1", features = ["std"] }
39+
pin-project-lite = "0.2.16"
3940

4041
[dependencies.diesel]
4142
version = "~2.3.0"

src/pg/mod.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,45 +1167,42 @@ mod tests {
11671167
.await
11681168
.unwrap();
11691169

1170-
fn erase<'a, T: Future + Send + 'a>(t: T) -> impl Future<Output = T::Output> + Send + 'a {
1171-
t
1172-
}
1173-
11741170
async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
11751171
let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
11761172
let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
11771173

1178-
erase(try_join(f1, f2)).await
1174+
try_join(f1, f2).await
11791175
}
11801176

1181-
async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
1182-
let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1183-
let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1184-
1185-
try_join(f3, f4).boxed().await
1186-
}
1187-
1188-
async fn fn56(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
1177+
async fn fn37(
1178+
mut conn: &AsyncPgConnection,
1179+
) -> QueryResult<(usize, Vec<i32>, i32, Vec<i32>, i32)> {
1180+
let f3 = diesel::select(0_i32.into_sql::<Integer>()).execute(&mut conn);
1181+
let f4 = diesel::select(4_i32.into_sql::<Integer>()).load::<i32>(&mut conn);
11891182
let f5 = diesel::select(5_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1190-
let f6 = diesel::select(6_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1183+
let f6 = diesel::select(6_i32.into_sql::<Integer>()).get_results::<i32>(&mut conn);
1184+
let f7 = diesel::select(7_i32.into_sql::<Integer>()).first::<i32>(&mut conn);
11911185

1192-
try_join!(f5.boxed(), f6.boxed())
1186+
try_join!(f3, f4, f5, f6, f7)
11931187
}
11941188

11951189
conn.transaction(|conn| {
11961190
async move {
11971191
let f12 = fn12(conn);
1198-
let f34 = fn34(conn);
1199-
let f56 = fn56(conn);
1192+
let f37 = fn37(conn);
12001193

1201-
let ((r1, r2), (r3, r4), (r5, r6)) = try_join!(f12, f34, f56).unwrap();
1194+
let ((r1, r2), (r3, r4, r5, r6, r7)) = try_join!(f12, f37).unwrap();
12021195

12031196
assert_eq!(r1, 1);
12041197
assert_eq!(r2, 2);
1205-
assert_eq!(r3, 3);
1206-
assert_eq!(r4, 4);
1198+
assert_eq!(r3, 1);
1199+
assert_eq!(r4, vec![4]);
12071200
assert_eq!(r5, 5);
1208-
assert_eq!(r6, 6);
1201+
assert_eq!(r6, vec![6]);
1202+
assert_eq!(r7, 7);
1203+
1204+
fn12(conn).await?;
1205+
fn37(conn).await?;
12091206

12101207
QueryResult::<_>::Ok(())
12111208
}

src/run_query_dsl/mod.rs

Lines changed: 24 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
mod utils;
2+
13
use crate::AsyncConnectionCore;
24
use diesel::associations::HasTable;
35
use diesel::query_builder::IntoUpdateTarget;
46
use diesel::result::QueryResult;
57
use diesel::AsChangeset;
68
use futures_core::future::BoxFuture;
7-
use futures_core::Stream;
8-
use futures_util::{future, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
9+
#[cfg(any(feature = "mysql", feature = "postgres"))]
10+
use futures_util::FutureExt;
11+
use futures_util::{stream, StreamExt, TryStreamExt};
912
use std::future::Future;
10-
use std::pin::Pin;
1113

1214
/// The traits used by `QueryDsl`.
1315
///
@@ -22,7 +24,7 @@ pub mod methods {
2224
use diesel::expression::QueryMetadata;
2325
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
2426
use diesel::query_dsl::CompatibleType;
25-
use futures_util::{Future, Stream, TryFutureExt};
27+
use futures_util::{Future, Stream};
2628

2729
/// The `execute` method
2830
///
@@ -74,6 +76,7 @@ pub mod methods {
7476
type LoadFuture<'conn>: Future<Output = QueryResult<Self::Stream<'conn>>> + Send
7577
where
7678
Conn: 'conn;
79+
7780
/// The inner stream returned by [`LoadQuery::internal_load`]
7881
type Stream<'conn>: Stream<Item = QueryResult<U>> + Send
7982
where
@@ -96,10 +99,7 @@ pub mod methods {
9699
ST: 'static,
97100
{
98101
type LoadFuture<'conn>
99-
= future::MapOk<
100-
Conn::LoadFuture<'conn, 'query>,
101-
fn(Conn::Stream<'conn, 'query>) -> Self::Stream<'conn>,
102-
>
102+
= utils::Map<Conn::LoadFuture<'conn, 'query>, Self::Stream<'conn>>
103103
where
104104
Conn: 'conn;
105105

@@ -112,33 +112,13 @@ pub mod methods {
112112
Conn: 'conn;
113113

114114
fn internal_load(self, conn: &mut Conn) -> Self::LoadFuture<'_> {
115-
conn.load(self)
116-
.map_ok(map_result_stream_future::<U, _, _, DB, ST>)
115+
utils::Map::new(conn.load(self), |stream| {
116+
Ok(stream?.map(|row| {
117+
U::build_from_row(&row?).map_err(diesel::result::Error::DeserializationError)
118+
}))
119+
})
117120
}
118121
}
119-
120-
#[allow(clippy::type_complexity)]
121-
fn map_result_stream_future<'s, 'a, U, S, R, DB, ST>(
122-
stream: S,
123-
) -> stream::Map<S, fn(QueryResult<R>) -> QueryResult<U>>
124-
where
125-
S: Stream<Item = QueryResult<R>> + Send + 's,
126-
R: diesel::row::Row<'a, DB> + 's,
127-
DB: Backend + 'static,
128-
U: FromSqlRow<ST, DB> + 'static,
129-
ST: 'static,
130-
{
131-
stream.map(map_row_helper::<_, DB, U, ST>)
132-
}
133-
134-
fn map_row_helper<'a, R, DB, U, ST>(row: QueryResult<R>) -> QueryResult<U>
135-
where
136-
U: FromSqlRow<ST, DB>,
137-
R: diesel::row::Row<'a, DB>,
138-
DB: Backend,
139-
{
140-
U::build_from_row(&row?).map_err(diesel::result::Error::DeserializationError)
141-
}
142122
}
143123

144124
/// The return types produced by the various [`RunQueryDsl`] methods
@@ -149,37 +129,24 @@ pub mod methods {
149129
// the same connection
150130
#[allow(type_alias_bounds)] // we need these bounds otherwise we cannot use GAT's
151131
pub mod return_futures {
132+
use crate::run_query_dsl::utils;
133+
152134
use super::methods::LoadQuery;
153-
use diesel::QueryResult;
154-
use futures_util::{future, stream};
135+
use futures_util::stream;
155136
use std::pin::Pin;
156137

157138
/// The future returned by [`RunQueryDsl::load`](super::RunQueryDsl::load)
158139
/// and [`RunQueryDsl::get_results`](super::RunQueryDsl::get_results)
159140
///
160141
/// This is essentially `impl Future<Output = QueryResult<Vec<U>>>`
161-
pub type LoadFuture<'conn, 'query, Q: LoadQuery<'query, Conn, U>, Conn, U> = future::AndThen<
162-
Q::LoadFuture<'conn>,
163-
stream::TryCollect<Q::Stream<'conn>, Vec<U>>,
164-
fn(Q::Stream<'conn>) -> stream::TryCollect<Q::Stream<'conn>, Vec<U>>,
165-
>;
142+
pub type LoadFuture<'conn, 'query, Q: LoadQuery<'query, Conn, U>, Conn, U> =
143+
utils::AndThen<Q::LoadFuture<'conn>, stream::TryCollect<Q::Stream<'conn>, Vec<U>>>;
166144

167145
/// The future returned by [`RunQueryDsl::get_result`](super::RunQueryDsl::get_result)
168146
///
169147
/// This is essentially `impl Future<Output = QueryResult<U>>`
170-
pub type GetResult<'conn, 'query, Q: LoadQuery<'query, Conn, U>, Conn, U> = future::AndThen<
171-
Q::LoadFuture<'conn>,
172-
future::Map<
173-
stream::StreamFuture<Pin<Box<Q::Stream<'conn>>>>,
174-
fn((Option<QueryResult<U>>, Pin<Box<Q::Stream<'conn>>>)) -> QueryResult<U>,
175-
>,
176-
fn(
177-
Q::Stream<'conn>,
178-
) -> future::Map<
179-
stream::StreamFuture<Pin<Box<Q::Stream<'conn>>>>,
180-
fn((Option<QueryResult<U>>, Pin<Box<Q::Stream<'conn>>>)) -> QueryResult<U>,
181-
>,
182-
>;
148+
pub type GetResult<'conn, 'query, Q: LoadQuery<'query, Conn, U>, Conn, U> =
149+
utils::AndThen<Q::LoadFuture<'conn>, utils::LoadNext<Pin<Box<Q::Stream<'conn>>>>>;
183150
}
184151

185152
/// Methods used to execute queries.
@@ -346,13 +313,7 @@ pub trait RunQueryDsl<Conn>: Sized {
346313
Conn: AsyncConnectionCore,
347314
Self: methods::LoadQuery<'query, Conn, U> + 'query,
348315
{
349-
fn collect_result<U, S>(stream: S) -> stream::TryCollect<S, Vec<U>>
350-
where
351-
S: Stream<Item = QueryResult<U>>,
352-
{
353-
stream.try_collect()
354-
}
355-
self.internal_load(conn).and_then(collect_result::<U, _>)
316+
utils::AndThen::new(self.internal_load(conn), |stream| Ok(stream?.try_collect()))
356317
}
357318

358319
/// Executes the given query, returning a [`Stream`] with the returned rows.
@@ -547,29 +508,9 @@ pub trait RunQueryDsl<Conn>: Sized {
547508
Conn: AsyncConnectionCore,
548509
Self: methods::LoadQuery<'query, Conn, U> + 'query,
549510
{
550-
#[allow(clippy::type_complexity)]
551-
fn get_next_stream_element<S, U>(
552-
stream: S,
553-
) -> future::Map<
554-
stream::StreamFuture<Pin<Box<S>>>,
555-
fn((Option<QueryResult<U>>, Pin<Box<S>>)) -> QueryResult<U>,
556-
>
557-
where
558-
S: Stream<Item = QueryResult<U>>,
559-
{
560-
fn map_option_to_result<U, S>(
561-
(o, _): (Option<QueryResult<U>>, Pin<Box<S>>),
562-
) -> QueryResult<U> {
563-
match o {
564-
Some(s) => s,
565-
None => Err(diesel::result::Error::NotFound),
566-
}
567-
}
568-
569-
Box::pin(stream).into_future().map(map_option_to_result)
570-
}
571-
572-
self.load_stream(conn).and_then(get_next_stream_element)
511+
utils::AndThen::new(self.internal_load(conn), |stream| {
512+
Ok(utils::LoadNext::new(Box::pin(stream?)))
513+
})
573514
}
574515

575516
/// Runs the command, returning an `Vec` with the affected rows.

src/run_query_dsl/utils.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use diesel::QueryResult;
6+
use futures_core::{ready, TryFuture, TryStream};
7+
use futures_util::TryStreamExt;
8+
use pin_project_lite::pin_project;
9+
10+
pin_project! {
11+
/// Reimplementation of [`futures_util::future::Map`] without the generic closure argument
12+
#[project = MapProj]
13+
#[project_replace = MapProjReplace]
14+
pub enum Map<Fut: Future, T> {
15+
Incomplete {
16+
#[pin]
17+
future: Fut,
18+
f: fn(Fut::Output) -> QueryResult<T>,
19+
},
20+
Complete,
21+
}
22+
}
23+
24+
impl<Fut: Future, T> Map<Fut, T> {
25+
pub(crate) fn new(future: Fut, f: fn(Fut::Output) -> QueryResult<T>) -> Self {
26+
Self::Incomplete { future, f }
27+
}
28+
}
29+
30+
impl<Fut: Future, T> Future for Map<Fut, T> {
31+
type Output = QueryResult<T>;
32+
33+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<QueryResult<T>> {
34+
match self.as_mut().project() {
35+
MapProj::Incomplete { future, .. } => {
36+
let output = ready!(future.poll(cx));
37+
match self.as_mut().project_replace(Map::Complete) {
38+
MapProjReplace::Incomplete { f, .. } => Poll::Ready(f(output)),
39+
MapProjReplace::Complete => unreachable!(),
40+
}
41+
}
42+
MapProj::Complete => panic!("Map polled after completion"),
43+
}
44+
}
45+
}
46+
47+
pin_project! {
48+
/// Reimplementation of [`futures_util::future::AndThen`] without the generic closure argument
49+
#[project = AndThenProj]
50+
pub enum AndThen<Fut1: Future, Fut2> {
51+
First {
52+
#[pin]
53+
future1: Map<Fut1, Fut2>,
54+
},
55+
Second {
56+
#[pin]
57+
future2: Fut2,
58+
},
59+
Empty,
60+
}
61+
}
62+
63+
impl<Fut1: Future, Fut2> AndThen<Fut1, Fut2> {
64+
pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Output) -> QueryResult<Fut2>) -> AndThen<Fut1, Fut2> {
65+
Self::First {
66+
future1: Map::new(fut1, f),
67+
}
68+
}
69+
}
70+
71+
impl<Fut1, Fut2> Future for AndThen<Fut1, Fut2>
72+
where
73+
Fut1: TryFuture<Error = diesel::result::Error>,
74+
Fut2: TryFuture<Error = diesel::result::Error>,
75+
{
76+
type Output = QueryResult<Fut2::Ok>;
77+
78+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79+
loop {
80+
match self.as_mut().project() {
81+
AndThenProj::First { future1 } => match ready!(future1.try_poll(cx)) {
82+
Ok(future2) => self.set(Self::Second { future2 }),
83+
Err(error) => {
84+
self.set(Self::Empty);
85+
break Poll::Ready(Err(error));
86+
}
87+
},
88+
AndThenProj::Second { future2 } => {
89+
let output = ready!(future2.try_poll(cx));
90+
self.set(Self::Empty);
91+
break Poll::Ready(output);
92+
}
93+
AndThenProj::Empty => panic!("AndThen polled after completion"),
94+
}
95+
}
96+
}
97+
}
98+
99+
/// Converts a stream into a future, only yielding the first element.
100+
/// Based on [`futures_util::stream::StreamFuture`].
101+
pub struct LoadNext<St> {
102+
stream: Option<St>,
103+
}
104+
105+
impl<St> LoadNext<St> {
106+
pub(crate) fn new(stream: St) -> Self {
107+
Self {
108+
stream: Some(stream),
109+
}
110+
}
111+
}
112+
113+
impl<St> Future for LoadNext<St>
114+
where
115+
St: TryStream<Error = diesel::result::Error> + Unpin,
116+
{
117+
type Output = QueryResult<St::Ok>;
118+
119+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120+
let first = {
121+
let s = self.stream.as_mut().expect("polling LoadNext twice");
122+
ready!(s.try_poll_next_unpin(cx))
123+
};
124+
self.stream = None;
125+
match first {
126+
Some(first) => Poll::Ready(first),
127+
None => Poll::Ready(Err(diesel::result::Error::NotFound)),
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)