From 4a5d30b4c6b94b81a3979a8be65cdef258480778 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sat, 22 Jun 2019 21:22:03 -0700 Subject: [PATCH] Buffer copy_in messages Otherwise there's a ton of overhead passing tons of tiny messages over channels. --- .circleci/config.yml | 2 +- codegen/src/main.rs | 1 + tokio-postgres/src/proto/client.rs | 4 +- tokio-postgres/src/proto/copy_in.rs | 98 ++++++++++++++++++----------- 4 files changed, 67 insertions(+), 38 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b2d7fb73..3acad782 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -22,7 +22,7 @@ version: 2 jobs: build: docker: - - image: rust:1.31.0 + - image: rust:1.35.0 environment: RUSTFLAGS: -D warnings - image: sfackler/rust-postgres-test:5 diff --git a/codegen/src/main.rs b/codegen/src/main.rs index e6559aca..6d99ded6 100644 --- a/codegen/src/main.rs +++ b/codegen/src/main.rs @@ -1,4 +1,5 @@ #![warn(clippy::all)] +#![allow(clippy::write_with_newline)] extern crate linked_hash_map; extern crate marksman_escape; diff --git a/tokio-postgres/src/proto/client.rs b/tokio-postgres/src/proto/client.rs index c3c011e8..a328e90d 100644 --- a/tokio-postgres/src/proto/client.rs +++ b/tokio-postgres/src/proto/client.rs @@ -212,8 +212,8 @@ impl Client { S::Error: Into>, { let (mut sender, receiver) = mpsc::channel(1); - let pending = PendingRequest(self.excecute_message(statement, params).map(|buf| { - match sender.start_send(CopyMessage::Data(buf)) { + let pending = PendingRequest(self.excecute_message(statement, params).map(|data| { + match sender.start_send(CopyMessage { data, done: false }) { Ok(AsyncSink::Ready) => {} _ => unreachable!("channel should have capacity"), } diff --git a/tokio-postgres/src/proto/copy_in.rs b/tokio-postgres/src/proto/copy_in.rs index 2f75596a..33018aa1 100644 --- a/tokio-postgres/src/proto/copy_in.rs +++ b/tokio-postgres/src/proto/copy_in.rs @@ -6,14 +6,15 @@ use postgres_protocol::message::backend::Message; use postgres_protocol::message::frontend; use state_machine_future::{transition, RentToOwn, StateMachineFuture}; use std::error::Error as StdError; +use std::mem; use crate::proto::client::{Client, PendingRequest}; use crate::proto::statement::Statement; use crate::Error; -pub enum CopyMessage { - Data(Vec), - Done, +pub struct CopyMessage { + pub data: Vec, + pub done: bool, } pub struct CopyInReceiver { @@ -40,13 +41,14 @@ impl Stream for CopyInReceiver { } match self.receiver.poll()? { - Async::Ready(Some(CopyMessage::Data(buf))) => Ok(Async::Ready(Some(buf))), - Async::Ready(Some(CopyMessage::Done)) => { - self.done = true; - let mut buf = vec![]; - frontend::copy_done(&mut buf); - frontend::sync(&mut buf); - Ok(Async::Ready(Some(buf))) + Async::Ready(Some(mut data)) => { + if data.done { + self.done = true; + frontend::copy_done(&mut data.data); + frontend::sync(&mut data.data); + } + + Ok(Async::Ready(Some(data.data))) } Async::Ready(None) => { self.done = true; @@ -85,6 +87,7 @@ where #[state_machine_future(transitions(WriteCopyDone))] WriteCopyData { stream: S, + buf: Vec, pending_message: Option, sender: mpsc::Sender, receiver: mpsc::Receiver, @@ -133,6 +136,7 @@ where let state = state.take(); transition!(WriteCopyData { stream: state.stream, + buf: vec![], pending_message: None, sender: state.sender, receiver: state.receiver @@ -148,34 +152,58 @@ where fn poll_write_copy_data<'a>( state: &'a mut RentToOwn<'a, WriteCopyData>, ) -> Poll { - loop { - let message = match state.pending_message.take() { - Some(message) => message, - None => match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) { - Some(data) => { - let mut buf = vec![]; - // FIXME avoid collect - frontend::copy_data(&data.into_buf().collect::>(), &mut buf) - .map_err(Error::encode)?; - CopyMessage::Data(buf) - } - None => { - let state = state.take(); - transition!(WriteCopyDone { - future: state.sender.send(CopyMessage::Done), - receiver: state.receiver - }) - } - }, - }; - - match state.sender.start_send(message) { - Ok(AsyncSink::Ready) => {} - Ok(AsyncSink::NotReady(message)) => { + if let Some(message) = state.pending_message.take() { + match state + .sender + .start_send(message) + .map_err(|_| Error::closed())? + { + AsyncSink::Ready => {} + AsyncSink::NotReady(message) => { + state.pending_message = Some(message); + return Ok(Async::NotReady); + } + } + } + + loop { + let done = loop { + match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) { + Some(data) => { + // FIXME avoid collect + frontend::copy_data(&data.into_buf().collect::>(), &mut state.buf) + .map_err(Error::encode)?; + if state.buf.len() > 4096 { + break false; + } + } + None => break true, + } + }; + + let message = CopyMessage { + data: mem::replace(&mut state.buf, vec![]), + done, + }; + + if done { + let state = state.take(); + transition!(WriteCopyDone { + future: state.sender.send(message), + receiver: state.receiver, + }); + } + + match state + .sender + .start_send(message) + .map_err(|_| Error::closed())? + { + AsyncSink::Ready => {} + AsyncSink::NotReady(message) => { state.pending_message = Some(message); return Ok(Async::NotReady); } - Err(_) => return Err(Error::closed()), } } }