Skip to content

Commit bcb4ca0

Browse files
authoredJun 23, 2019
Merge pull request #448 from sfackler/bufer-copy-in
Buffer copy_in messages
2 parents d884298 + 4a5d30b commit bcb4ca0

File tree

4 files changed

+60
-31
lines changed

4 files changed

+60
-31
lines changed
 

‎.circleci/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ version: 2
2222
jobs:
2323
build:
2424
docker:
25-
- image: rust:1.31.0
25+
- image: rust:1.35.0
2626
environment:
2727
RUSTFLAGS: -D warnings
2828
- image: sfackler/rust-postgres-test:5

‎codegen/src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![warn(clippy::all)]
2+
#![allow(clippy::write_with_newline)]
23

34
extern crate linked_hash_map;
45
extern crate marksman_escape;

‎tokio-postgres/src/proto/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ impl Client {
212212
S::Error: Into<Box<dyn StdError + Sync + Send>>,
213213
{
214214
let (mut sender, receiver) = mpsc::channel(1);
215-
let pending = PendingRequest(self.excecute_message(statement, params).map(|buf| {
216-
match sender.start_send(CopyMessage::Data(buf)) {
215+
let pending = PendingRequest(self.excecute_message(statement, params).map(|data| {
216+
match sender.start_send(CopyMessage { data, done: false }) {
217217
Ok(AsyncSink::Ready) => {}
218218
_ => unreachable!("channel should have capacity"),
219219
}

‎tokio-postgres/src/proto/copy_in.rs

+56-28
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ use postgres_protocol::message::backend::Message;
66
use postgres_protocol::message::frontend;
77
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
88
use std::error::Error as StdError;
9+
use std::mem;
910

1011
use crate::proto::client::{Client, PendingRequest};
1112
use crate::proto::statement::Statement;
1213
use crate::Error;
1314

14-
pub enum CopyMessage {
15-
Data(Vec<u8>),
16-
Done,
15+
pub struct CopyMessage {
16+
pub data: Vec<u8>,
17+
pub done: bool,
1718
}
1819

1920
pub struct CopyInReceiver {
@@ -40,13 +41,14 @@ impl Stream for CopyInReceiver {
4041
}
4142

4243
match self.receiver.poll()? {
43-
Async::Ready(Some(CopyMessage::Data(buf))) => Ok(Async::Ready(Some(buf))),
44-
Async::Ready(Some(CopyMessage::Done)) => {
45-
self.done = true;
46-
let mut buf = vec![];
47-
frontend::copy_done(&mut buf);
48-
frontend::sync(&mut buf);
49-
Ok(Async::Ready(Some(buf)))
44+
Async::Ready(Some(mut data)) => {
45+
if data.done {
46+
self.done = true;
47+
frontend::copy_done(&mut data.data);
48+
frontend::sync(&mut data.data);
49+
}
50+
51+
Ok(Async::Ready(Some(data.data)))
5052
}
5153
Async::Ready(None) => {
5254
self.done = true;
@@ -85,6 +87,7 @@ where
8587
#[state_machine_future(transitions(WriteCopyDone))]
8688
WriteCopyData {
8789
stream: S,
90+
buf: Vec<u8>,
8891
pending_message: Option<CopyMessage>,
8992
sender: mpsc::Sender<CopyMessage>,
9093
receiver: mpsc::Receiver<Message>,
@@ -133,6 +136,7 @@ where
133136
let state = state.take();
134137
transition!(WriteCopyData {
135138
stream: state.stream,
139+
buf: vec![],
136140
pending_message: None,
137141
sender: state.sender,
138142
receiver: state.receiver
@@ -148,34 +152,58 @@ where
148152
fn poll_write_copy_data<'a>(
149153
state: &'a mut RentToOwn<'a, WriteCopyData<S>>,
150154
) -> Poll<AfterWriteCopyData, Error> {
155+
if let Some(message) = state.pending_message.take() {
156+
match state
157+
.sender
158+
.start_send(message)
159+
.map_err(|_| Error::closed())?
160+
{
161+
AsyncSink::Ready => {}
162+
AsyncSink::NotReady(message) => {
163+
state.pending_message = Some(message);
164+
return Ok(Async::NotReady);
165+
}
166+
}
167+
}
168+
151169
loop {
152-
let message = match state.pending_message.take() {
153-
Some(message) => message,
154-
None => match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) {
170+
let done = loop {
171+
match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) {
155172
Some(data) => {
156-
let mut buf = vec![];
157173
// FIXME avoid collect
158-
frontend::copy_data(&data.into_buf().collect::<Vec<_>>(), &mut buf)
174+
frontend::copy_data(&data.into_buf().collect::<Vec<_>>(), &mut state.buf)
159175
.map_err(Error::encode)?;
160-
CopyMessage::Data(buf)
176+
if state.buf.len() > 4096 {
177+
break false;
178+
}
161179
}
162-
None => {
163-
let state = state.take();
164-
transition!(WriteCopyDone {
165-
future: state.sender.send(CopyMessage::Done),
166-
receiver: state.receiver
167-
})
168-
}
169-
},
180+
None => break true,
181+
}
170182
};
171183

172-
match state.sender.start_send(message) {
173-
Ok(AsyncSink::Ready) => {}
174-
Ok(AsyncSink::NotReady(message)) => {
184+
let message = CopyMessage {
185+
data: mem::replace(&mut state.buf, vec![]),
186+
done,
187+
};
188+
189+
if done {
190+
let state = state.take();
191+
transition!(WriteCopyDone {
192+
future: state.sender.send(message),
193+
receiver: state.receiver,
194+
});
195+
}
196+
197+
match state
198+
.sender
199+
.start_send(message)
200+
.map_err(|_| Error::closed())?
201+
{
202+
AsyncSink::Ready => {}
203+
AsyncSink::NotReady(message) => {
175204
state.pending_message = Some(message);
176205
return Ok(Async::NotReady);
177206
}
178-
Err(_) => return Err(Error::closed()),
179207
}
180208
}
181209
}

0 commit comments

Comments
 (0)
Please sign in to comment.