Buffer copy_in messages

Otherwise there's a ton of overhead passing tons of tiny messages over
channels.
This commit is contained in:
Steven Fackler 2019-06-22 21:22:03 -07:00
parent d8842982b0
commit 4a5d30b4c6
4 changed files with 67 additions and 38 deletions

View File

@ -22,7 +22,7 @@ version: 2
jobs: jobs:
build: build:
docker: docker:
- image: rust:1.31.0 - image: rust:1.35.0
environment: environment:
RUSTFLAGS: -D warnings RUSTFLAGS: -D warnings
- image: sfackler/rust-postgres-test:5 - image: sfackler/rust-postgres-test:5

View File

@ -1,4 +1,5 @@
#![warn(clippy::all)] #![warn(clippy::all)]
#![allow(clippy::write_with_newline)]
extern crate linked_hash_map; extern crate linked_hash_map;
extern crate marksman_escape; extern crate marksman_escape;

View File

@ -212,8 +212,8 @@ impl Client {
S::Error: Into<Box<dyn StdError + Sync + Send>>, S::Error: Into<Box<dyn StdError + Sync + Send>>,
{ {
let (mut sender, receiver) = mpsc::channel(1); let (mut sender, receiver) = mpsc::channel(1);
let pending = PendingRequest(self.excecute_message(statement, params).map(|buf| { let pending = PendingRequest(self.excecute_message(statement, params).map(|data| {
match sender.start_send(CopyMessage::Data(buf)) { match sender.start_send(CopyMessage { data, done: false }) {
Ok(AsyncSink::Ready) => {} Ok(AsyncSink::Ready) => {}
_ => unreachable!("channel should have capacity"), _ => unreachable!("channel should have capacity"),
} }

View File

@ -6,14 +6,15 @@ use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend; use postgres_protocol::message::frontend;
use state_machine_future::{transition, RentToOwn, StateMachineFuture}; use state_machine_future::{transition, RentToOwn, StateMachineFuture};
use std::error::Error as StdError; use std::error::Error as StdError;
use std::mem;
use crate::proto::client::{Client, PendingRequest}; use crate::proto::client::{Client, PendingRequest};
use crate::proto::statement::Statement; use crate::proto::statement::Statement;
use crate::Error; use crate::Error;
pub enum CopyMessage { pub struct CopyMessage {
Data(Vec<u8>), pub data: Vec<u8>,
Done, pub done: bool,
} }
pub struct CopyInReceiver { pub struct CopyInReceiver {
@ -40,13 +41,14 @@ impl Stream for CopyInReceiver {
} }
match self.receiver.poll()? { match self.receiver.poll()? {
Async::Ready(Some(CopyMessage::Data(buf))) => Ok(Async::Ready(Some(buf))), Async::Ready(Some(mut data)) => {
Async::Ready(Some(CopyMessage::Done)) => { if data.done {
self.done = true; self.done = true;
let mut buf = vec![]; frontend::copy_done(&mut data.data);
frontend::copy_done(&mut buf); frontend::sync(&mut data.data);
frontend::sync(&mut buf); }
Ok(Async::Ready(Some(buf)))
Ok(Async::Ready(Some(data.data)))
} }
Async::Ready(None) => { Async::Ready(None) => {
self.done = true; self.done = true;
@ -85,6 +87,7 @@ where
#[state_machine_future(transitions(WriteCopyDone))] #[state_machine_future(transitions(WriteCopyDone))]
WriteCopyData { WriteCopyData {
stream: S, stream: S,
buf: Vec<u8>,
pending_message: Option<CopyMessage>, pending_message: Option<CopyMessage>,
sender: mpsc::Sender<CopyMessage>, sender: mpsc::Sender<CopyMessage>,
receiver: mpsc::Receiver<Message>, receiver: mpsc::Receiver<Message>,
@ -133,6 +136,7 @@ where
let state = state.take(); let state = state.take();
transition!(WriteCopyData { transition!(WriteCopyData {
stream: state.stream, stream: state.stream,
buf: vec![],
pending_message: None, pending_message: None,
sender: state.sender, sender: state.sender,
receiver: state.receiver receiver: state.receiver
@ -148,34 +152,58 @@ where
fn poll_write_copy_data<'a>( fn poll_write_copy_data<'a>(
state: &'a mut RentToOwn<'a, WriteCopyData<S>>, state: &'a mut RentToOwn<'a, WriteCopyData<S>>,
) -> Poll<AfterWriteCopyData, Error> { ) -> Poll<AfterWriteCopyData, Error> {
loop { if let Some(message) = state.pending_message.take() {
let message = match state.pending_message.take() { match state
Some(message) => message, .sender
None => match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) { .start_send(message)
Some(data) => { .map_err(|_| Error::closed())?
let mut buf = vec![]; {
// FIXME avoid collect AsyncSink::Ready => {}
frontend::copy_data(&data.into_buf().collect::<Vec<_>>(), &mut buf) AsyncSink::NotReady(message) => {
.map_err(Error::encode)?; state.pending_message = Some(message);
CopyMessage::Data(buf) return Ok(Async::NotReady);
} }
None => { }
let state = state.take(); }
transition!(WriteCopyDone {
future: state.sender.send(CopyMessage::Done), loop {
receiver: state.receiver let done = loop {
}) match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) {
} Some(data) => {
}, // FIXME avoid collect
}; frontend::copy_data(&data.into_buf().collect::<Vec<_>>(), &mut state.buf)
.map_err(Error::encode)?;
match state.sender.start_send(message) { if state.buf.len() > 4096 {
Ok(AsyncSink::Ready) => {} break false;
Ok(AsyncSink::NotReady(message)) => { }
}
None => break true,
}
};
let message = CopyMessage {
data: mem::replace(&mut state.buf, vec![]),
done,
};
if done {
let state = state.take();
transition!(WriteCopyDone {
future: state.sender.send(message),
receiver: state.receiver,
});
}
match state
.sender
.start_send(message)
.map_err(|_| Error::closed())?
{
AsyncSink::Ready => {}
AsyncSink::NotReady(message) => {
state.pending_message = Some(message); state.pending_message = Some(message);
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(_) => return Err(Error::closed()),
} }
} }
} }