Start switching over to postgres-protocol

This commit is contained in:
Steven Fackler 2016-09-11 15:54:37 -07:00
parent 1c272246a4
commit 59e1273faa
3 changed files with 21 additions and 26 deletions

View File

@ -37,6 +37,7 @@ byteorder = "0.5"
hex = "0.2"
log = "0.3"
phf = "=0.7.15"
postgres-protocol = { git = "https://github.com/sfackler/rust-postgres-protocol" }
bit-vec = { version = "0.4", optional = true }
chrono = { version = "0.2.14", optional = true }
eui48 = { version = "0.1", optional = true }

View File

@ -48,6 +48,7 @@ extern crate hex;
#[macro_use]
extern crate log;
extern crate phf;
extern crate postgres_protocol;
use bufstream::BufStream;
use md5::Md5;
@ -60,6 +61,7 @@ use std::mem;
use std::result;
use std::sync::Arc;
use std::time::Duration;
use postgres_protocol::message::frontend;
use error::{Error, ConnectError, SqlState, DbError};
use io::{TlsStream, TlsHandshake};
@ -205,6 +207,7 @@ struct StatementInfo {
struct InnerConnection {
stream: BufStream<Box<TlsStream>>,
io_buf: Vec<u8>,
notice_handler: Box<HandleNotice>,
notifications: VecDeque<Notification>,
cancel_data: CancelData,
@ -246,6 +249,7 @@ impl InnerConnection {
let mut conn = InnerConnection {
stream: BufStream::new(stream),
io_buf: vec![],
next_stmt_id: 0,
notice_handler: Box::new(LoggingNoticeHandler),
notifications: VecDeque::new(),
@ -274,10 +278,10 @@ impl InnerConnection {
options.push(("database".to_owned(), database));
}
try!(conn.write_messages(&[Frontend::StartupMessage {
version: message::PROTOCOL_VERSION,
parameters: &options,
}]));
try!(conn.write_message(&frontend::StartupMessage {
parameters: &options,
}));
try!(conn.stream.flush());
try!(conn.handle_auth(user));
@ -296,6 +300,14 @@ impl InnerConnection {
Ok(conn)
}
fn write_message<M>(&mut self, message: &M) -> std_io::Result<()>
where M: frontend::Message
{
self.io_buf.clear();
try!(message.write(&mut self.io_buf));
self.stream.write_all(&self.io_buf)
}
fn write_messages(&mut self, messages: &[Frontend]) -> std_io::Result<()> {
debug_assert!(!self.desynchronized);
for message in messages {
@ -380,7 +392,8 @@ impl InnerConnection {
let pass = try!(user.password.ok_or_else(|| {
ConnectError::ConnectParams("a password was requested but not provided".into())
}));
try!(self.write_messages(&[Frontend::PasswordMessage { password: &pass }]));
try!(self.write_message(&frontend::PasswordMessage { password: &pass }));
try!(self.stream.flush());
}
Backend::AuthenticationMD5Password { salt } => {
let pass = try!(user.password.ok_or_else(|| {
@ -394,7 +407,8 @@ impl InnerConnection {
hasher.input(output.as_bytes());
hasher.input(&salt);
let output = format!("md5{}", hasher.result_str());
try!(self.write_messages(&[Frontend::PasswordMessage { password: &output }]));
try!(self.write_message(&frontend::PasswordMessage { password: &output }));
try!(self.stream.flush());
}
Backend::AuthenticationKerberosV5 |
Backend::AuthenticationSCMCredential |

View File

@ -7,7 +7,6 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use types::Oid;
use priv_io::StreamOptions;
pub const PROTOCOL_VERSION: u32 = 0x0003_0000;
pub const CANCEL_CODE: u32 = 80877102;
pub const SSL_CODE: u32 = 80877103;
@ -122,19 +121,12 @@ pub enum Frontend<'a> {
query: &'a str,
param_types: &'a [Oid],
},
PasswordMessage {
password: &'a str,
},
Query {
query: &'a str,
},
SslRequest {
code: u32,
},
StartupMessage {
version: u32,
parameters: &'a [(String, String)],
},
Sync,
Terminate,
}
@ -227,22 +219,10 @@ impl<W: Write> WriteMessage for W {
try!(buf.write_u32::<BigEndian>(ty));
}
}
Frontend::PasswordMessage { password } => {
ident = Some(b'p');
try!(buf.write_cstr(password));
}
Frontend::Query { query } => {
ident = Some(b'Q');
try!(buf.write_cstr(query));
}
Frontend::StartupMessage { version, parameters } => {
try!(buf.write_u32::<BigEndian>(version));
for &(ref k, ref v) in parameters {
try!(buf.write_cstr(&**k));
try!(buf.write_cstr(&**v));
}
try!(buf.write_u8(0));
}
Frontend::SslRequest { code } => try!(buf.write_u32::<BigEndian>(code)),
Frontend::Sync => ident = Some(b'S'),
Frontend::Terminate => ident = Some(b'X'),