diff --git a/Cargo.toml b/Cargo.toml index c31fbcab..41d9837e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ byteorder = "0.5" hex = "0.2" log = "0.3" phf = "=0.7.15" +postgres-protocol = { git = "https://github.com/sfackler/rust-postgres-protocol" } bit-vec = { version = "0.4", optional = true } chrono = { version = "0.2.14", optional = true } eui48 = { version = "0.1", optional = true } diff --git a/src/lib.rs b/src/lib.rs index 6de081d3..f7c6ba60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ extern crate hex; #[macro_use] extern crate log; extern crate phf; +extern crate postgres_protocol; use bufstream::BufStream; use md5::Md5; @@ -60,6 +61,7 @@ use std::mem; use std::result; use std::sync::Arc; use std::time::Duration; +use postgres_protocol::message::frontend; use error::{Error, ConnectError, SqlState, DbError}; use io::{TlsStream, TlsHandshake}; @@ -205,6 +207,7 @@ struct StatementInfo { struct InnerConnection { stream: BufStream>, + io_buf: Vec, notice_handler: Box, notifications: VecDeque, cancel_data: CancelData, @@ -246,6 +249,7 @@ impl InnerConnection { let mut conn = InnerConnection { stream: BufStream::new(stream), + io_buf: vec![], next_stmt_id: 0, notice_handler: Box::new(LoggingNoticeHandler), notifications: VecDeque::new(), @@ -274,10 +278,10 @@ impl InnerConnection { options.push(("database".to_owned(), database)); } - try!(conn.write_messages(&[Frontend::StartupMessage { - version: message::PROTOCOL_VERSION, - parameters: &options, - }])); + try!(conn.write_message(&frontend::StartupMessage { + parameters: &options, + })); + try!(conn.stream.flush()); try!(conn.handle_auth(user)); @@ -296,6 +300,14 @@ impl InnerConnection { Ok(conn) } + fn write_message(&mut self, message: &M) -> std_io::Result<()> + where M: frontend::Message + { + self.io_buf.clear(); + try!(message.write(&mut self.io_buf)); + self.stream.write_all(&self.io_buf) + } + fn write_messages(&mut self, messages: &[Frontend]) -> std_io::Result<()> { debug_assert!(!self.desynchronized); for message in messages { @@ -380,7 +392,8 @@ impl InnerConnection { let pass = try!(user.password.ok_or_else(|| { ConnectError::ConnectParams("a password was requested but not provided".into()) })); - try!(self.write_messages(&[Frontend::PasswordMessage { password: &pass }])); + try!(self.write_message(&frontend::PasswordMessage { password: &pass })); + try!(self.stream.flush()); } Backend::AuthenticationMD5Password { salt } => { let pass = try!(user.password.ok_or_else(|| { @@ -394,7 +407,8 @@ impl InnerConnection { hasher.input(output.as_bytes()); hasher.input(&salt); let output = format!("md5{}", hasher.result_str()); - try!(self.write_messages(&[Frontend::PasswordMessage { password: &output }])); + try!(self.write_message(&frontend::PasswordMessage { password: &output })); + try!(self.stream.flush()); } Backend::AuthenticationKerberosV5 | Backend::AuthenticationSCMCredential | diff --git a/src/message.rs b/src/message.rs index 67576cfa..b46c66b3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -7,7 +7,6 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use types::Oid; use priv_io::StreamOptions; -pub const PROTOCOL_VERSION: u32 = 0x0003_0000; pub const CANCEL_CODE: u32 = 80877102; pub const SSL_CODE: u32 = 80877103; @@ -122,19 +121,12 @@ pub enum Frontend<'a> { query: &'a str, param_types: &'a [Oid], }, - PasswordMessage { - password: &'a str, - }, Query { query: &'a str, }, SslRequest { code: u32, }, - StartupMessage { - version: u32, - parameters: &'a [(String, String)], - }, Sync, Terminate, } @@ -227,22 +219,10 @@ impl WriteMessage for W { try!(buf.write_u32::(ty)); } } - Frontend::PasswordMessage { password } => { - ident = Some(b'p'); - try!(buf.write_cstr(password)); - } Frontend::Query { query } => { ident = Some(b'Q'); try!(buf.write_cstr(query)); } - Frontend::StartupMessage { version, parameters } => { - try!(buf.write_u32::(version)); - for &(ref k, ref v) in parameters { - try!(buf.write_cstr(&**k)); - try!(buf.write_cstr(&**v)); - } - try!(buf.write_u8(0)); - } Frontend::SslRequest { code } => try!(buf.write_u32::(code)), Frontend::Sync => ident = Some(b'S'), Frontend::Terminate => ident = Some(b'X'),