Automatically close dead statements

This commit is contained in:
Steven Fackler 2016-12-21 21:51:47 -05:00
parent 29210bb38b
commit 522ea10a98
2 changed files with 49 additions and 28 deletions

View File

@ -16,6 +16,8 @@ use postgres_shared::RowData;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::mem;
use std::sync::mpsc::{self, Sender, Receiver};
use tokio_core::reactor::Handle;
#[doc(inline)]
@ -40,6 +42,8 @@ pub struct CancelData {
struct InnerConnection {
stream: PostgresStream,
close_receiver: Receiver<(u8, String)>,
close_sender: Sender<(u8, String)>,
parameters: HashMap<String, String>,
cancel_data: CancelData,
next_stmt_id: u32,
@ -122,8 +126,11 @@ impl Connection {
stream::connect(params.host(), params.port(), handle)
.map_err(ConnectError::Io)
.map(|s| {
let (sender, receiver) = mpsc::channel();
Connection(InnerConnection {
stream: s,
close_sender: sender,
close_receiver: receiver,
parameters: HashMap::new(),
cancel_data: CancelData {
process_id: 0,
@ -321,6 +328,7 @@ impl Connection {
_ => Err(bad_message())
}
})
.and_then(|(t, s)| s.close_gc().map(|s| (t, s)))
.boxed()
}
@ -336,7 +344,9 @@ impl Connection {
}
pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, Error> {
self.simple_query(query).map(|r| r.1).boxed()
self.simple_query(query)
.map(|r| r.1)
.boxed()
}
fn raw_prepare(self,
@ -346,9 +356,9 @@ impl Connection {
let mut parse = vec![];
let mut describe = vec![];
let mut sync = vec![];
frontend::sync(&mut sync);
frontend::parse(name, query, None, &mut parse)
.and_then(|()| frontend::describe(b'S', name, &mut describe))
.and_then(|()| Ok(frontend::sync(&mut sync)))
.into_future()
.and_then(move |()| {
let it = Some(parse).into_iter()
@ -457,6 +467,7 @@ impl Connection {
let mut bind = vec![];
let mut execute = vec![];
let mut sync = vec![];
frontend::sync(&mut sync);
let r = frontend::bind(portal,
stmt,
Some(1),
@ -482,10 +493,6 @@ impl Connection {
.map(|()| s)
.map_err(Error::Io)
})
.map(|s| {
frontend::sync(&mut sync);
s
})
.into_future()
.and_then(|s| {
let it = Some(bind).into_iter()
@ -546,6 +553,7 @@ impl Connection {
self.raw_prepare(&name, query)
.map(|(params, columns, conn)| {
let stmt = Statement {
close_sender: conn.0.close_sender.clone(),
name: name,
params: params,
columns: columns,
@ -555,29 +563,41 @@ impl Connection {
.boxed()
}
fn raw_close(self, type_: u8, name: &str) -> BoxFuture<Connection, Error> {
let mut close = vec![];
let mut sync = vec![];
frontend::close(type_, name, &mut close)
.map(|()| frontend::sync(&mut sync))
.into_future()
.and_then(move |()| {
let it = Some(close).into_iter().chain(Some(sync)).map(Ok::<_, io::Error>);
self.0.send_all(futures::stream::iter(it))
})
.and_then(|s| s.0.read())
fn close_gc(self) -> BoxFuture<Connection, Error> {
let mut messages = vec![];
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
let mut buf = vec![];
frontend::close(type_, &name, &mut buf).unwrap(); // this can only fail on bad names
messages.push(buf);
}
if messages.is_empty() {
return Ok(self).into_future().boxed();
}
let mut buf = vec![];
frontend::sync(&mut buf);
messages.push(buf);
self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
.map_err(Error::Io)
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed()
}
fn finish_close_gc(self) -> BoxFuture<Connection, Error> {
self.0.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::CloseComplete => Either::A(Ok(Connection(s)).into_future()),
backend::Message::ReadyForQuery(_) => {
Either::A(Ok(Connection(s)).into_future())
}
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
})
.and_then(|s| s.ready(()))
.map(|((), s)| s)
.boxed()
}
@ -601,11 +621,19 @@ struct Column {
}
pub struct Statement {
close_sender: Sender<(u8, String)>,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
}
impl Drop for Statement {
fn drop(&mut self) {
let name = mem::replace(&mut self.name, String::new());
let _ = self.close_sender.send((b'S', name));
}
}
impl Statement {
pub fn execute(self,
params: &[&ToSql],
@ -616,10 +644,6 @@ impl Statement {
.map(|(n, conn)| (n, self, conn))
.boxed()
}
pub fn close(self, conn: Connection) -> BoxFuture<Connection, Error> {
conn.raw_close(b'S', &self.name)
}
}
fn connect_err(fields: &mut ErrorFields) -> ConnectError {

View File

@ -113,11 +113,8 @@ fn prepare_execute() {
c.unwrap().prepare("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)")
})
.and_then(|(s, c)| s.execute(&[], c))
.and_then(|(n, s, c)| {
.and_then(|(n, _, c)| {
assert_eq!(0, n);
s.close(c)
})
.and_then(|c| {
c.prepare("INSERT INTO foo (name) VALUES ($1), ($2)")
})
.and_then(|(s, c)| s.execute(&[&"steven", &"bob"], c))