Batch command sequence writes

This allows command sequences like Prepare/Describe/Sync to be sent in
fewer TCP packets.
This commit is contained in:
Steven Fackler 2013-08-31 17:12:19 -07:00
parent 557c61d6db
commit 0f1d31c4ab

View File

@ -5,7 +5,8 @@ use extra::md5::Md5;
use extra::url::{UserInfo, Url}; use extra::url::{UserInfo, Url};
use std::cell::Cell; use std::cell::Cell;
use std::hashmap::HashMap; use std::hashmap::HashMap;
use std::rt::io::io_error; use std::rt::io::{io_error, Decorator};
use std::rt::io::mem::MemWriter;
use std::rt::io::net::ip::SocketAddr; use std::rt::io::net::ip::SocketAddr;
use std::rt::io::net::tcp::TcpStream; use std::rt::io::net::tcp::TcpStream;
@ -183,6 +184,16 @@ impl PostgresConnection {
Ok(conn) Ok(conn)
} }
fn write_messages(&self, messages: &[&FrontendMessage]) {
let mut buf = MemWriter::new();
for &message in messages.iter() {
buf.write_message(message);
}
do self.stream.with_mut_ref |s| {
s.write(buf.inner_ref().as_slice());
}
}
fn write_message(&self, message: &FrontendMessage) { fn write_message(&self, message: &FrontendMessage) {
do self.stream.with_mut_ref |s| { do self.stream.with_mut_ref |s| {
s.write_message(message); s.write_message(message);
@ -249,13 +260,17 @@ impl PostgresConnection {
self.next_stmt_id.put_back(id + 1); self.next_stmt_id.put_back(id + 1);
let types = []; let types = [];
self.write_message(&Parse { self.write_messages([
name: stmt_name, &Parse {
query: query, name: stmt_name,
param_types: types query: query,
}); param_types: types
self.write_message(&Describe { variant: 'S' as u8, name: stmt_name }); },
self.write_message(&Sync); &Describe {
variant: 'S' as u8,
name: stmt_name
},
&Sync]);
match_read_message!(self, { match_read_message!(self, {
ParseComplete => (), ParseComplete => (),
@ -371,11 +386,12 @@ pub struct PostgresStatement<'self> {
impl<'self> Drop for PostgresStatement<'self> { impl<'self> Drop for PostgresStatement<'self> {
fn drop(&self) { fn drop(&self) {
do io_error::cond.trap(|_| {}).inside { do io_error::cond.trap(|_| {}).inside {
self.conn.write_message(&Close { self.conn.write_messages([
variant: 'S' as u8, &Close {
name: self.name.as_slice() variant: 'S' as u8,
}); name: self.name.as_slice()
self.conn.write_message(&Sync); },
&Sync]);
loop { loop {
match_read_message!(self.conn, { match_read_message!(self.conn, {
ReadyForQuery {_} => break, ReadyForQuery {_} => break,
@ -399,18 +415,19 @@ impl<'self> PostgresStatement<'self> {
let result_formats = []; let result_formats = [];
self.conn.write_message(&Bind { self.conn.write_messages([
portal: portal_name, &Bind {
statement: self.name.as_slice(), portal: portal_name,
formats: formats, statement: self.name.as_slice(),
values: values, formats: formats,
result_formats: result_formats values: values,
}); result_formats: result_formats
self.conn.write_message(&Execute { },
portal: portal_name.as_slice(), &Execute {
max_rows: 0 portal: portal_name.as_slice(),
}); max_rows: 0
self.conn.write_message(&Sync); },
&Sync]);
match_read_message!(self.conn, { match_read_message!(self.conn, {
BindComplete => None, BindComplete => None,
@ -521,11 +538,12 @@ pub struct PostgresResult<'self> {
impl<'self> Drop for PostgresResult<'self> { impl<'self> Drop for PostgresResult<'self> {
fn drop(&self) { fn drop(&self) {
do io_error::cond.trap(|_| {}).inside { do io_error::cond.trap(|_| {}).inside {
self.stmt.conn.write_message(&Close { self.stmt.conn.write_messages([
variant: 'P' as u8, &Close {
name: self.name.as_slice() variant: 'P' as u8,
}); name: self.name.as_slice()
self.stmt.conn.write_message(&Sync); },
&Sync]);
loop { loop {
match_read_message!(self.stmt.conn, { match_read_message!(self.stmt.conn, {
ReadyForQuery {_} => break, ReadyForQuery {_} => break,