Start on prepare

This commit is contained in:
Steven Fackler 2018-06-18 08:18:04 -04:00
parent 8c3770bd57
commit 0d0435fc2e
8 changed files with 153 additions and 17 deletions

View File

@ -44,6 +44,7 @@ tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-tcp = "0.1" tokio-tcp = "0.1"
tokio-timer = "0.2" tokio-timer = "0.2"
want = "0.0.5"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
tokio-uds = "0.2" tokio-uds = "0.2"

View File

@ -7,6 +7,7 @@ extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_tcp; extern crate tokio_tcp;
extern crate tokio_timer; extern crate tokio_timer;
extern crate want;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
@ -18,12 +19,13 @@ extern crate state_machine_future;
#[cfg(unix)] #[cfg(unix)]
extern crate tokio_uds; extern crate tokio_uds;
use futures::sync::mpsc;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use std::io; use std::io;
#[doc(inline)] #[doc(inline)]
pub use postgres_shared::{error, params}; pub use postgres_shared::stmt::Column;
#[doc(inline)]
pub use postgres_shared::{error, params, types};
#[doc(inline)] #[doc(inline)]
pub use postgres_shared::{CancelData, Notification}; pub use postgres_shared::{CancelData, Notification};
@ -46,12 +48,22 @@ fn disconnected() -> Error {
)) ))
} }
pub struct Client(mpsc::Sender<proto::Request>); pub struct Client(proto::Client);
impl Client { impl Client {
pub fn connect(params: ConnectParams) -> Handshake { pub fn connect(params: ConnectParams) -> Handshake {
Handshake(proto::HandshakeFuture::new(params)) Handshake(proto::HandshakeFuture::new(params))
} }
/// Polls to to determine whether the connection is ready to send new requests to the backend.
///
/// Requests are unboundedly buffered to enable pipelining, but this risks unbounded memory consumption if requests
/// are produced at a faster pace than the backend can process. This method can be used to cooperatively "throttle"
/// request creation. Specifically, it returns ready when the connection has sent any queued requests and is waiting
/// on new requests from the client.
pub fn poll_ready(&mut self) -> Poll<(), Error> {
self.0.poll_ready()
}
} }
pub struct Connection(proto::Connection); pub struct Connection(proto::Connection);
@ -82,8 +94,8 @@ impl Future for Handshake {
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<(Client, Connection), Error> { fn poll(&mut self) -> Poll<(Client, Connection), Error> {
let (sender, connection) = try_ready!(self.0.poll()); let (client, connection) = try_ready!(self.0.poll());
Ok(Async::Ready((Client(sender), Connection(connection)))) Ok(Async::Ready((Client(client), Connection(connection))))
} }
} }

View File

@ -0,0 +1,32 @@
use futures::sync::mpsc;
use futures::Poll;
use postgres_protocol::message::backend::Message;
use want::Giver;
use disconnected;
use error::Error;
use proto::connection::Request;
pub struct Client {
sender: mpsc::UnboundedSender<Request>,
giver: Giver,
}
impl Client {
pub fn new(sender: mpsc::UnboundedSender<Request>, giver: Giver) -> Client {
Client { sender, giver }
}
pub fn poll_ready(&mut self) -> Poll<(), Error> {
self.giver.poll_want().map_err(|_| disconnected())
}
pub fn send(&mut self, messages: Vec<u8>) -> Result<mpsc::Receiver<Message>, Error> {
let (sender, receiver) = mpsc::channel(0);
self.giver.give();
self.sender
.unbounded_send(Request { messages, sender })
.map(|_| receiver)
.map_err(|_| disconnected())
}
}

View File

@ -5,6 +5,7 @@ use postgres_protocol::message::frontend;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::io; use std::io;
use tokio_codec::Framed; use tokio_codec::Framed;
use want::Taker;
use error::{self, Error}; use error::{self, Error};
use proto::codec::PostgresCodec; use proto::codec::PostgresCodec;
@ -27,7 +28,8 @@ pub struct Connection {
stream: Framed<Socket, PostgresCodec>, stream: Framed<Socket, PostgresCodec>,
cancel_data: CancelData, cancel_data: CancelData,
parameters: HashMap<String, String>, parameters: HashMap<String, String>,
receiver: mpsc::Receiver<Request>, receiver: mpsc::UnboundedReceiver<Request>,
taker: Taker,
pending_request: Option<Vec<u8>>, pending_request: Option<Vec<u8>>,
pending_response: Option<Message>, pending_response: Option<Message>,
responses: VecDeque<mpsc::Sender<Message>>, responses: VecDeque<mpsc::Sender<Message>>,
@ -39,13 +41,15 @@ impl Connection {
stream: Framed<Socket, PostgresCodec>, stream: Framed<Socket, PostgresCodec>,
cancel_data: CancelData, cancel_data: CancelData,
parameters: HashMap<String, String>, parameters: HashMap<String, String>,
receiver: mpsc::Receiver<Request>, receiver: mpsc::UnboundedReceiver<Request>,
taker: Taker,
) -> Connection { ) -> Connection {
Connection { Connection {
stream, stream,
cancel_data, cancel_data,
parameters, parameters,
receiver, receiver,
taker,
pending_request: None, pending_request: None,
pending_response: None, pending_response: None,
responses: VecDeque::new(), responses: VecDeque::new(),
@ -76,7 +80,10 @@ impl Connection {
Async::Ready(None) => { Async::Ready(None) => {
return Err(Error::from(io::Error::from(io::ErrorKind::UnexpectedEof))); return Err(Error::from(io::Error::from(io::ErrorKind::UnexpectedEof)));
} }
Async::NotReady => return Ok(()), Async::NotReady => {
self.taker.want();
return Ok(());
}
}; };
let message = match message { let message = match message {
@ -100,7 +107,7 @@ impl Connection {
}, },
}; };
let ready = match message { let request_complete = match message {
Message::ReadyForQuery(_) => true, Message::ReadyForQuery(_) => true,
_ => false, _ => false,
}; };
@ -109,7 +116,7 @@ impl Connection {
// if the receiver's hung up we still need to page through the rest of the messages // if the receiver's hung up we still need to page through the rest of the messages
// designated to it // designated to it
Ok(AsyncSink::Ready) | Err(_) => { Ok(AsyncSink::Ready) | Err(_) => {
if !ready { if !request_complete {
self.responses.push_front(sender); self.responses.push_front(sender);
} }
} }

View File

@ -10,11 +10,13 @@ use state_machine_future::RentToOwn;
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use tokio_codec::Framed; use tokio_codec::Framed;
use want;
use error::{self, Error}; use error::{self, Error};
use params::{ConnectParams, User}; use params::{ConnectParams, User};
use proto::client::Client;
use proto::codec::PostgresCodec; use proto::codec::PostgresCodec;
use proto::connection::{Connection, Request}; use proto::connection::Connection;
use proto::socket::{ConnectFuture, Socket}; use proto::socket::{ConnectFuture, Socket};
use {bad_response, disconnected, CancelData}; use {bad_response, disconnected, CancelData};
@ -60,7 +62,7 @@ pub enum Handshake {
parameters: HashMap<String, String>, parameters: HashMap<String, String>,
}, },
#[state_machine_future(ready)] #[state_machine_future(ready)]
Finished((mpsc::Sender<Request>, Connection)), Finished((Client, Connection)),
#[state_machine_future(error)] #[state_machine_future(error)]
Failed(Error), Failed(Error),
} }
@ -281,10 +283,17 @@ impl PollHandshake for Handshake {
let cancel_data = state.cancel_data.ok_or_else(|| { let cancel_data = state.cancel_data.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "BackendKeyData message missing") io::Error::new(io::ErrorKind::InvalidData, "BackendKeyData message missing")
})?; })?;
let (sender, receiver) = mpsc::channel(0); let (sender, receiver) = mpsc::unbounded();
let connection = let (giver, taker) = want::new();
Connection::new(state.stream, cancel_data, state.parameters, receiver); let client = Client::new(sender, giver);
transition!(Finished((sender, connection))) let connection = Connection::new(
state.stream,
cancel_data,
state.parameters,
receiver,
taker,
);
transition!(Finished((client, connection)))
} }
Some(Message::ErrorResponse(body)) => return Err(error::__db(body)), Some(Message::ErrorResponse(body)) => return Err(error::__db(body)),
Some(Message::NoticeResponse(_)) => {} Some(Message::NoticeResponse(_)) => {}

View File

@ -1,9 +1,15 @@
mod client;
mod codec; mod codec;
mod connection; mod connection;
mod handshake; mod handshake;
mod prepare;
mod socket; mod socket;
mod statement;
pub use proto::client::Client;
pub use proto::codec::PostgresCodec; pub use proto::codec::PostgresCodec;
pub use proto::connection::{Connection, Request}; pub use proto::connection::Connection;
pub use proto::handshake::HandshakeFuture; pub use proto::handshake::HandshakeFuture;
pub use proto::prepare::PrepareFuture;
pub use proto::socket::Socket; pub use proto::socket::Socket;
pub use statement::Statement;

View File

@ -0,0 +1,20 @@
use futures::sync::mpsc;
use postgres_protocol::message::backend::Message;
use error::Error;
use proto::connection::Request;
use proto::statement::Statement;
#[derive(StateMachineFuture)]
pub enum Prepare {
#[state_machine_future(start)]
Start {
sender: mpsc::UnboundedSender<Request>,
receiver: Result<mpsc::Receiver<Message>, Error>,
name: String,
},
#[state_machine_future(ready)]
Finished(Statement),
#[state_machine_future(error)]
Failed(Error),
}

View File

@ -0,0 +1,49 @@
use futures::sync::mpsc;
use postgres_protocol::message::frontend;
use postgres_shared::stmt::Column;
use proto::connection::Request;
use types::Type;
pub struct Statement {
sender: mpsc::UnboundedSender<Request>,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
}
impl Drop for Statement {
fn drop(&mut self) {
let mut buf = vec![];
frontend::close(b'S', &self.name, &mut buf).expect("statement name not valid");
let (sender, _) = mpsc::channel(0);
self.sender.unbounded_send(Request {
messages: buf,
sender,
});
}
}
impl Statement {
pub fn new(
sender: mpsc::UnboundedReceiver<Request>,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
Statement {
sender,
name,
params,
columns,
}
}
pub fn params(&self) -> &[Type] {
&self.params
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
}