Skip to content

Send response messages in blocks #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ members = [
"tokio-postgres-native-tls",
"tokio-postgres-openssl",
]

[profile.release]
debug = 2
104 changes: 82 additions & 22 deletions postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(missing_docs)]

use byteorder::{BigEndian, ReadBytesExt};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use memchr::memchr;
Expand All @@ -11,6 +11,66 @@ use std::str;

use crate::Oid;

pub const PARSE_COMPLETE_TAG: u8 = b'1';
pub const BIND_COMPLETE_TAG: u8 = b'2';
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
pub const NOTIFICATION_RESPONSE_TAG: u8 = b'A';
pub const COPY_DONE_TAG: u8 = b'c';
pub const COMMAND_COMPLETE_TAG: u8 = b'C';
pub const COPY_DATA_TAG: u8 = b'd';
pub const DATA_ROW_TAG: u8 = b'D';
pub const ERROR_RESPONSE_TAG: u8 = b'E';
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
pub const NO_DATA_TAG: u8 = b'n';
pub const NOTICE_RESPONSE_TAG: u8 = b'N';
pub const AUTHENTICATION_TAG: u8 = b'R';
pub const PORTAL_SUSPENDED_TAG: u8 = b's';
pub const PARAMETER_STATUS_TAG: u8 = b'S';
pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
pub const READY_FOR_QUERY_TAG: u8 = b'Z';

#[derive(Debug, Copy, Clone)]
pub struct Header {
tag: u8,
len: i32,
}

#[allow(clippy::len_without_is_empty)]
impl Header {
#[inline]
pub fn parse(buf: &[u8]) -> io::Result<Option<Header>> {
if buf.len() < 5 {
return Ok(None);
}

let tag = buf[0];
let len = BigEndian::read_i32(&buf[1..]);

if len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length",
));
}

Ok(Some(Header { tag, len }))
}

#[inline]
pub fn tag(self) -> u8 {
self.tag
}

#[inline]
pub fn len(self) -> i32 {
self.len
}
}

/// An enum representing Postgres backend messages.
pub enum Message {
AuthenticationCleartextPassword,
Expand Down Expand Up @@ -80,10 +140,10 @@ impl Message {
};

let message = match tag {
b'1' => Message::ParseComplete,
b'2' => Message::BindComplete,
b'3' => Message::CloseComplete,
b'A' => {
PARSE_COMPLETE_TAG => Message::ParseComplete,
BIND_COMPLETE_TAG => Message::BindComplete,
CLOSE_COMPLETE_TAG => Message::CloseComplete,
NOTIFICATION_RESPONSE_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
let channel = buf.read_cstr()?;
let message = buf.read_cstr()?;
Expand All @@ -93,25 +153,25 @@ impl Message {
message,
})
}
b'c' => Message::CopyDone,
b'C' => {
COPY_DONE_TAG => Message::CopyDone,
COMMAND_COMPLETE_TAG => {
let tag = buf.read_cstr()?;
Message::CommandComplete(CommandCompleteBody { tag })
}
b'd' => {
COPY_DATA_TAG => {
let storage = buf.read_all();
Message::CopyData(CopyDataBody { storage })
}
b'D' => {
DATA_ROW_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::DataRow(DataRowBody { storage, len })
}
b'E' => {
ERROR_RESPONSE_TAG => {
let storage = buf.read_all();
Message::ErrorResponse(ErrorResponseBody { storage })
}
b'G' => {
COPY_IN_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Expand All @@ -121,7 +181,7 @@ impl Message {
storage,
})
}
b'H' => {
COPY_OUT_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Expand All @@ -131,21 +191,21 @@ impl Message {
storage,
})
}
b'I' => Message::EmptyQueryResponse,
b'K' => {
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
BACKEND_KEY_DATA_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
let secret_key = buf.read_i32::<BigEndian>()?;
Message::BackendKeyData(BackendKeyDataBody {
process_id,
secret_key,
})
}
b'n' => Message::NoData,
b'N' => {
NO_DATA_TAG => Message::NoData,
NOTICE_RESPONSE_TAG => {
let storage = buf.read_all();
Message::NoticeResponse(NoticeResponseBody { storage })
}
b'R' => match buf.read_i32::<BigEndian>()? {
AUTHENTICATION_TAG => match buf.read_i32::<BigEndian>()? {
0 => Message::AuthenticationOk,
2 => Message::AuthenticationKerberosV5,
3 => Message::AuthenticationCleartextPassword,
Expand Down Expand Up @@ -180,23 +240,23 @@ impl Message {
));
}
},
b's' => Message::PortalSuspended,
b'S' => {
PORTAL_SUSPENDED_TAG => Message::PortalSuspended,
PARAMETER_STATUS_TAG => {
let name = buf.read_cstr()?;
let value = buf.read_cstr()?;
Message::ParameterStatus(ParameterStatusBody { name, value })
}
b't' => {
PARAMETER_DESCRIPTION_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::ParameterDescription(ParameterDescriptionBody { storage, len })
}
b'T' => {
ROW_DESCRIPTION_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::RowDescription(RowDescriptionBody { storage, len })
}
b'Z' => {
READY_FOR_QUERY_TAG => {
let status = buf.read_u8()?;
Message::ReadyForQuery(ReadyForQueryBody { status })
}
Expand Down
8 changes: 4 additions & 4 deletions tokio-postgres/src/proto/bind.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use futures::sync::mpsc;
use futures::{Poll, Stream};
use futures::{try_ready, Poll, Stream};
use postgres_protocol::message::backend::Message;
use state_machine_future::{transition, RentToOwn, StateMachineFuture};

use crate::proto::client::{Client, PendingRequest};
use crate::proto::portal::Portal;
use crate::proto::responses::Responses;
use crate::proto::statement::Statement;
use crate::Error;

Expand All @@ -19,7 +19,7 @@ pub enum Bind {
},
#[state_machine_future(transitions(Finished))]
ReadBindComplete {
receiver: mpsc::Receiver<Message>,
receiver: Responses,
client: Client,
name: String,
statement: Statement,
Expand All @@ -46,7 +46,7 @@ impl PollBind for Bind {
fn poll_read_bind_complete<'a>(
state: &'a mut RentToOwn<'a, ReadBindComplete>,
) -> Poll<AfterReadBindComplete, Error> {
let message = try_ready_receive!(state.receiver.poll());
let message = try_ready!(state.receiver.poll());
let state = state.take();

match message {
Expand Down
6 changes: 3 additions & 3 deletions tokio-postgres/src/proto/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use bytes::IntoBuf;
use futures::sync::mpsc;
use futures::{AsyncSink, Poll, Sink, Stream};
use postgres_protocol;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::collections::HashMap;
use std::error::Error as StdError;
Expand All @@ -20,6 +19,7 @@ use crate::proto::idle::{IdleGuard, IdleState};
use crate::proto::portal::Portal;
use crate::proto::prepare::PrepareFuture;
use crate::proto::query::QueryStream;
use crate::proto::responses::{self, Responses};
use crate::proto::simple_query::SimpleQueryStream;
use crate::proto::statement::Statement;
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -130,9 +130,9 @@ impl Client {
self.0.state.lock().typeinfo_composite_query = Some(statement.clone());
}

pub fn send(&self, request: PendingRequest) -> Result<mpsc::Receiver<Message>, Error> {
pub fn send(&self, request: PendingRequest) -> Result<Responses, Error> {
let (messages, idle) = request.0?;
let (sender, receiver) = mpsc::channel(1);
let (sender, receiver) = responses::channel();
self.0
.sender
.unbounded_send(Request {
Expand Down
70 changes: 67 additions & 3 deletions tokio-postgres/src/proto/codec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend;
use postgres_protocol::message::frontend::CopyData;
use std::io;
Expand All @@ -9,6 +10,31 @@ pub enum FrontendMessage {
CopyData(CopyData<Box<dyn Buf + Send>>),
}

pub enum BackendMessage {
Normal {
messages: BackendMessages,
request_complete: bool,
},
Async(backend::Message),
}

pub struct BackendMessages(BytesMut);

impl BackendMessages {
pub fn empty() -> BackendMessages {
BackendMessages(BytesMut::new())
}
}

impl FallibleIterator for BackendMessages {
type Item = backend::Message;
type Error = io::Error;

fn next(&mut self) -> io::Result<Option<backend::Message>> {
backend::Message::parse(&mut self.0)
}
}

pub struct PostgresCodec;

impl Encoder for PostgresCodec {
Expand All @@ -26,10 +52,48 @@ impl Encoder for PostgresCodec {
}

impl Decoder for PostgresCodec {
type Item = backend::Message;
type Item = BackendMessage;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<backend::Message>, io::Error> {
backend::Message::parse(src)
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
let mut idx = 0;
let mut request_complete = false;

while let Some(header) = backend::Header::parse(&src[idx..])? {
let len = header.len() as usize + 1;
if src[idx..].len() < len {
break;
}

match header.tag() {
backend::NOTICE_RESPONSE_TAG
| backend::NOTIFICATION_RESPONSE_TAG
| backend::PARAMETER_STATUS_TAG => {
if idx == 0 {
let message = backend::Message::parse(src)?.unwrap();
return Ok(Some(BackendMessage::Async(message)));
} else {
break;
}
}
_ => {}
}

idx += len;

if header.tag() == backend::READY_FOR_QUERY_TAG {
request_complete = true;
break;
}
}

if idx == 0 {
Ok(None)
} else {
Ok(Some(BackendMessage::Normal {
messages: BackendMessages(src.split_to(idx)),
request_complete,
}))
}
}
}
Loading