More protocol moves

This commit is contained in:
Steven Fackler 2016-09-11 16:13:57 -07:00
parent 59e1273faa
commit 52991387e5
2 changed files with 44 additions and 113 deletions

View File

@ -435,16 +435,17 @@ impl InnerConnection {
fn raw_prepare(&mut self, stmt_name: &str, query: &str) -> Result<(Vec<Type>, Vec<Column>)> {
debug!("preparing query with name `{}`: {}", stmt_name, query);
try!(self.write_messages(&[Frontend::Parse {
name: stmt_name,
query: query,
param_types: &[],
},
Frontend::Describe {
variant: b'S',
name: stmt_name,
},
Frontend::Sync]));
try!(self.write_message(&frontend::Parse {
name: stmt_name,
query: query,
param_types: &[],
}));
try!(self.write_message(&frontend::Describe {
variant: b'S',
name: stmt_name,
}));
try!(self.write_message(&frontend::Sync));
try!(self.stream.flush());
match try!(self.read_message()) {
Backend::ParseComplete => {}
@ -499,11 +500,11 @@ impl InnerConnection {
return DbError::new(fields);
}
Backend::CopyInResponse { .. } => {
try!(self.write_messages(&[Frontend::CopyFail {
message: "COPY queries cannot be directly \
executed",
},
Frontend::Sync]));
try!(self.write_message(&frontend::CopyFail {
message: "COPY queries cannot be directly executed",
}));
try!(self.write_message(&frontend::Sync));
try!(self.stream.flush());
}
Backend::CopyOutResponse { .. } => {
loop {
@ -548,18 +549,19 @@ impl InnerConnection {
}
}
try!(self.write_messages(&[Frontend::Bind {
portal: portal_name,
statement: &stmt_name,
formats: &[1],
values: &values,
result_formats: &[1],
},
Frontend::Execute {
portal: portal_name,
max_rows: row_limit,
},
Frontend::Sync]));
try!(self.write_message(&frontend::Bind {
portal: portal_name,
statement: &stmt_name,
formats: &[1],
values: &values,
result_formats: &[1],
}));
try!(self.write_message(&frontend::Execute {
portal: portal_name,
max_rows: row_limit,
}));
try!(self.write_message(&frontend::Sync));
try!(self.stream.flush());
match try!(self.read_message()) {
Backend::BindComplete => Ok(()),
@ -613,11 +615,12 @@ impl InnerConnection {
}
fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> {
try!(self.write_messages(&[Frontend::Close {
variant: type_,
name: name,
},
Frontend::Sync]));
try!(self.write_message(&frontend::Close {
variant: type_,
name: name,
}));
try!(self.write_message(&frontend::Sync));
try!(self.stream.flush());
let resp = match try!(self.read_message()) {
Backend::CloseComplete => Ok(()),
Backend::ErrorResponse { fields } => DbError::new(fields),
@ -816,7 +819,8 @@ impl InnerConnection {
fn quick_query(&mut self, query: &str) -> Result<Vec<Vec<Option<String>>>> {
check_desync!(self);
debug!("executing query: {}", query);
try!(self.write_messages(&[Frontend::Query { query: query }]));
try!(self.write_message(&frontend::Query { query: query }));
try!(self.stream.flush());
let mut result = vec![];
loop {
@ -830,11 +834,11 @@ impl InnerConnection {
.collect());
}
Backend::CopyInResponse { .. } => {
try!(self.write_messages(&[Frontend::CopyFail {
message: "COPY queries cannot be directly \
executed",
},
Frontend::Sync]));
try!(self.write_message(&frontend::CopyFail {
message: "COPY queries cannot be directly executed",
}));
try!(self.write_message(&frontend::Sync));
try!(self.stream.flush());
}
Backend::ErrorResponse { fields } => {
try!(self.wait_for_ready());
@ -848,7 +852,8 @@ impl InnerConnection {
fn finish_inner(&mut self) -> Result<()> {
check_desync!(self);
try!(self.write_messages(&[Frontend::Terminate]));
try!(self.write_message(&frontend::Terminate));
try!(self.stream.flush());
Ok(())
}
}

View File

@ -85,22 +85,11 @@ pub struct RowDescriptionEntry {
}
pub enum Frontend<'a> {
Bind {
portal: &'a str,
statement: &'a str,
formats: &'a [i16],
values: &'a [Option<Vec<u8>>],
result_formats: &'a [i16],
},
CancelRequest {
code: u32,
process_id: u32,
secret_key: u32,
},
Close {
variant: u8,
name: &'a str,
},
CopyData {
data: &'a [u8],
},
@ -108,27 +97,14 @@ pub enum Frontend<'a> {
CopyFail {
message: &'a str,
},
Describe {
variant: u8,
name: &'a str,
},
Execute {
portal: &'a str,
max_rows: i32,
},
Parse {
name: &'a str,
query: &'a str,
param_types: &'a [Oid],
},
Query {
query: &'a str,
},
SslRequest {
code: u32,
},
Sync,
Terminate,
}
#[doc(hidden)]
@ -155,42 +131,11 @@ impl<W: Write> WriteMessage for W {
let mut ident = None;
match *message {
Frontend::Bind { portal, statement, formats, values, result_formats } => {
ident = Some(b'B');
try!(buf.write_cstr(portal));
try!(buf.write_cstr(statement));
try!(buf.write_u16::<BigEndian>(try!(u16::from_usize(formats.len()))));
for &format in formats {
try!(buf.write_i16::<BigEndian>(format));
}
try!(buf.write_u16::<BigEndian>(try!(u16::from_usize(values.len()))));
for value in values {
match *value {
None => try!(buf.write_i32::<BigEndian>(-1)),
Some(ref value) => {
try!(buf.write_i32::<BigEndian>(try!(i32::from_usize(value.len()))));
try!(buf.write_all(&**value));
}
}
}
try!(buf.write_u16::<BigEndian>(try!(u16::from_usize(result_formats.len()))));
for &format in result_formats {
try!(buf.write_i16::<BigEndian>(format));
}
}
Frontend::CancelRequest { code, process_id, secret_key } => {
try!(buf.write_u32::<BigEndian>(code));
try!(buf.write_u32::<BigEndian>(process_id));
try!(buf.write_u32::<BigEndian>(secret_key));
}
Frontend::Close { variant, name } => {
ident = Some(b'C');
try!(buf.write_u8(variant));
try!(buf.write_cstr(name));
}
Frontend::CopyData { data } => {
ident = Some(b'd');
try!(buf.write_all(data));
@ -200,32 +145,13 @@ impl<W: Write> WriteMessage for W {
ident = Some(b'f');
try!(buf.write_cstr(message));
}
Frontend::Describe { variant, name } => {
ident = Some(b'D');
try!(buf.write_u8(variant));
try!(buf.write_cstr(name));
}
Frontend::Execute { portal, max_rows } => {
ident = Some(b'E');
try!(buf.write_cstr(portal));
try!(buf.write_i32::<BigEndian>(max_rows));
}
Frontend::Parse { name, query, param_types } => {
ident = Some(b'P');
try!(buf.write_cstr(name));
try!(buf.write_cstr(query));
try!(buf.write_u16::<BigEndian>(try!(u16::from_usize(param_types.len()))));
for &ty in param_types {
try!(buf.write_u32::<BigEndian>(ty));
}
}
Frontend::Query { query } => {
ident = Some(b'Q');
try!(buf.write_cstr(query));
}
Frontend::SslRequest { code } => try!(buf.write_u32::<BigEndian>(code)),
Frontend::Sync => ident = Some(b'S'),
Frontend::Terminate => ident = Some(b'X'),
}
if let Some(ident) = ident {