From 9d53e677fff7e01216ce55e4badc636013fd5c65 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 18 Dec 2016 16:11:38 -0800 Subject: [PATCH] Update to protocol 0.2 --- .gitignore | 5 +- Cargo.toml | 2 +- codegen/src/sqlstate.rs | 6 +- src/error/mod.rs | 95 +++++++++++++++++------- src/error/sqlstate.rs | 6 +- src/lib.rs | 158 ++++++++++++++++++++++------------------ src/notification.rs | 24 +++--- src/priv_io.rs | 12 +-- src/stmt.rs | 81 +++++++++++--------- 9 files changed, 231 insertions(+), 158 deletions(-) diff --git a/.gitignore b/.gitignore index f106a7bd..8e4846f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ -target/ +target Cargo.lock -.cargo/ +.cargo .idea *.iml +.vscode diff --git a/Cargo.toml b/Cargo.toml index 706972b2..47991d4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ fallible-iterator = "0.1.3" hex = "0.2" log = "0.3" phf = "=0.7.20" -postgres-protocol = "0.1" +postgres-protocol = "0.2" bit-vec = { version = "0.4", optional = true } chrono = { version = "0.2.14", optional = true } eui48 = { version = "0.1", optional = true } diff --git a/codegen/src/sqlstate.rs b/codegen/src/sqlstate.rs index 07194216..03bffaf9 100644 --- a/codegen/src/sqlstate.rs +++ b/codegen/src/sqlstate.rs @@ -120,10 +120,10 @@ fn make_impl(codes: &[Code], file: &mut BufWriter) { write!(file, r#" impl SqlState {{ /// Creates a `SqlState` from its error code. - pub fn from_code(s: String) -> SqlState {{ - match SQLSTATE_MAP.get(&*s) {{ + pub fn from_code(s: &str) -> SqlState {{ + match SQLSTATE_MAP.get(s) {{ Some(state) => state.clone(), - None => SqlState::Other(s), + None => SqlState::Other(s.to_owned()), }} }} diff --git a/src/error/mod.rs b/src/error/mod.rs index 477c09dc..0aa1e001 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -1,11 +1,12 @@ //! Error types. +use fallible_iterator::FallibleIterator; +use postgres_protocol::message::backend::ErrorFields; use std::error; use std::convert::From; use std::fmt; use std::io; use std::result; -use std::collections::HashMap; pub use self::sqlstate::SqlState; use {Result, DbErrorNew}; @@ -85,52 +86,92 @@ pub struct DbError { } impl DbErrorNew for DbError { - fn new_raw(fields: Vec<(u8, String)>) -> result::Result { - let mut map: HashMap<_, _> = fields.into_iter().collect(); + fn new_raw(fields: &mut ErrorFields) -> io::Result { + let mut severity = None; + let mut code = None; + let mut message = None; + let mut detail = None; + let mut hint = None; + let mut normal_position = None; + let mut internal_position = None; + let mut internal_query = None; + let mut where_ = None; + let mut schema = None; + let mut table = None; + let mut column = None; + let mut datatype = None; + let mut constraint = None; + let mut file = None; + let mut line = None; + let mut routine = None; + + while let Some(field) = try!(fields.next()) { + match field.type_() { + b'S' => severity = Some(field.value().to_owned()), + b'C' => code = Some(SqlState::from_code(field.value())), + b'M' => message = Some(field.value().to_owned()), + b'D' => detail = Some(field.value().to_owned()), + b'H' => hint = Some(field.value().to_owned()), + b'P' => normal_position = Some(try!(field.value().parse::().map_err(|_| ::bad_response()))), + b'p' => internal_position = Some(try!(field.value().parse::().map_err(|_| ::bad_response()))), + b'q' => internal_query = Some(field.value().to_owned()), + b'W' => where_ = Some(field.value().to_owned()), + b's' => schema = Some(field.value().to_owned()), + b't' => table = Some(field.value().to_owned()), + b'c' => column = Some(field.value().to_owned()), + b'd' => datatype = Some(field.value().to_owned()), + b'n' => constraint = Some(field.value().to_owned()), + b'F' => file = Some(field.value().to_owned()), + b'L' => line = Some(try!(field.value().parse::().map_err(|_| ::bad_response()))), + b'R' => routine = Some(field.value().to_owned()), + _ => {}, + } + } + Ok(DbError { - severity: try!(map.remove(&b'S').ok_or(())), - code: SqlState::from_code(try!(map.remove(&b'C').ok_or(()))), - message: try!(map.remove(&b'M').ok_or(())), - detail: map.remove(&b'D'), - hint: map.remove(&b'H'), - position: match map.remove(&b'P') { - Some(pos) => Some(ErrorPosition::Normal(try!(pos.parse().map_err(|_| ())))), + severity: try!(severity.ok_or_else(|| ::bad_response())), + code: try!(code.ok_or_else(|| ::bad_response())), + message: try!(message.ok_or_else(|| ::bad_response())), + detail: detail, + hint: hint, + position: match normal_position { + Some(position) => Some(ErrorPosition::Normal(position)), None => { - match map.remove(&b'p') { - Some(pos) => { + match internal_position { + Some(position) => { Some(ErrorPosition::Internal { - position: try!(pos.parse().map_err(|_| ())), - query: try!(map.remove(&b'q').ok_or(())), + position: position, + query: try!(internal_query.ok_or_else(|| ::bad_response())), }) } None => None, } } }, - where_: map.remove(&b'W'), - schema: map.remove(&b's'), - table: map.remove(&b't'), - column: map.remove(&b'c'), - datatype: map.remove(&b'd'), - constraint: map.remove(&b'n'), - file: map.remove(&b'F'), - line: map.remove(&b'L').and_then(|l| l.parse().ok()), - routine: map.remove(&b'R'), + where_: where_, + schema: schema, + table: table, + column: column, + datatype: datatype, + constraint: constraint, + file: file, + line: line, + routine: routine, _p: (), }) } - fn new_connect(fields: Vec<(u8, String)>) -> result::Result { + fn new_connect(fields: &mut ErrorFields) -> result::Result { match DbError::new_raw(fields) { Ok(err) => Err(ConnectError::Db(Box::new(err))), - Err(()) => Err(ConnectError::Io(::bad_response())), + Err(e) => Err(ConnectError::Io(e)), } } - fn new(fields: Vec<(u8, String)>) -> Result { + fn new(fields: &mut ErrorFields) -> Result { match DbError::new_raw(fields) { Ok(err) => Err(Error::Db(Box::new(err))), - Err(()) => Err(Error::Io(::bad_response())), + Err(e) => Err(Error::Io(e)), } } } diff --git a/src/error/sqlstate.rs b/src/error/sqlstate.rs index 12590048..f4baf388 100644 --- a/src/error/sqlstate.rs +++ b/src/error/sqlstate.rs @@ -782,10 +782,10 @@ static SQLSTATE_MAP: phf::Map<&'static str, SqlState> = ::phf::Map { impl SqlState { /// Creates a `SqlState` from its error code. - pub fn from_code(s: String) -> SqlState { - match SQLSTATE_MAP.get(&*s) { + pub fn from_code(s: &str) -> SqlState { + match SQLSTATE_MAP.get(s) { Some(state) => state.clone(), - None => SqlState::Other(s), + None => SqlState::Other(s.to_owned()), } } diff --git a/src/lib.rs b/src/lib.rs index 1ccc3c0c..92a1df7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ extern crate log; extern crate phf; extern crate postgres_protocol; +use fallible_iterator::FallibleIterator; use std::cell::{Cell, RefCell}; use std::collections::{VecDeque, HashMap}; use std::fmt; @@ -88,7 +89,7 @@ use std::result; use std::sync::Arc; use std::time::Duration; use postgres_protocol::authentication; -use postgres_protocol::message::backend::{self, RowDescriptionEntry}; +use postgres_protocol::message::backend::{self, ErrorFields}; use postgres_protocol::message::frontend; use error::{Error, ConnectError, SqlState, DbError}; @@ -307,12 +308,14 @@ impl InnerConnection { loop { match try!(conn.read_message()) { - backend::Message::BackendKeyData { process_id, secret_key } => { - conn.cancel_data.process_id = process_id; - conn.cancel_data.secret_key = secret_key; + backend::Message::BackendKeyData(body) => { + conn.cancel_data.process_id = body.process_id(); + conn.cancel_data.secret_key = body.secret_key(); + } + backend::Message::ReadyForQuery(_) => break, + backend::Message::ErrorResponse(body) => { + return DbError::new_connect(&mut body.fields()) } - backend::Message::ReadyForQuery { .. } => break, - backend::Message::ErrorResponse { fields } => return DbError::new_connect(fields), _ => return Err(ConnectError::Io(bad_response())), } } @@ -320,17 +323,18 @@ impl InnerConnection { Ok(conn) } - fn read_message_with_notification(&mut self) -> io::Result { + fn read_message_with_notification(&mut self) -> io::Result>> { debug_assert!(!self.desynchronized); loop { match try_desync!(self, self.stream.read_message()) { - backend::Message::NoticeResponse { fields } => { - if let Ok(err) = DbError::new_raw(fields) { + backend::Message::NoticeResponse(body) => { + if let Ok(err) = DbError::new_raw(&mut body.fields()) { self.notice_handler.handle_notice(err); } } - backend::Message::ParameterStatus { parameter, value } => { - self.parameters.insert(parameter, value); + backend::Message::ParameterStatus(body) => { + self.parameters.insert(try!(body.name()).to_owned(), + try!(body.value()).to_owned()); } val => return Ok(val), } @@ -339,17 +343,18 @@ impl InnerConnection { fn read_message_with_notification_timeout(&mut self, timeout: Duration) - -> io::Result> { + -> io::Result>>> { debug_assert!(!self.desynchronized); loop { match try_desync!(self, self.stream.read_message_timeout(timeout)) { - Some(backend::Message::NoticeResponse { fields }) => { - if let Ok(err) = DbError::new_raw(fields) { + Some(backend::Message::NoticeResponse(body)) => { + if let Ok(err) = DbError::new_raw(&mut body.fields()) { self.notice_handler.handle_notice(err); } } - Some(backend::Message::ParameterStatus { parameter, value }) => { - self.parameters.insert(parameter, value); + Some(backend::Message::ParameterStatus(body)) => { + self.parameters.insert(try!(body.name()).to_owned(), + try!(body.value()).to_owned()); } val => return Ok(val), } @@ -357,31 +362,32 @@ impl InnerConnection { } fn read_message_with_notification_nonblocking(&mut self) - -> io::Result> { + -> io::Result>>> { debug_assert!(!self.desynchronized); loop { match try_desync!(self, self.stream.read_message_nonblocking()) { - Some(backend::Message::NoticeResponse { fields }) => { - if let Ok(err) = DbError::new_raw(fields) { + Some(backend::Message::NoticeResponse(body)) => { + if let Ok(err) = DbError::new_raw(&mut body.fields()) { self.notice_handler.handle_notice(err); } } - Some(backend::Message::ParameterStatus { parameter, value }) => { - self.parameters.insert(parameter, value); + Some(backend::Message::ParameterStatus(body)) => { + self.parameters.insert(try!(body.name()).to_owned(), + try!(body.value()).to_owned()); } val => return Ok(val), } } } - fn read_message(&mut self) -> io::Result { + fn read_message(&mut self) -> io::Result>> { loop { match try!(self.read_message_with_notification()) { - backend::Message::NotificationResponse { process_id, channel, payload } => { + backend::Message::NotificationResponse(body) => { self.notifications.push_back(Notification { - process_id: process_id, - channel: channel, - payload: payload, + process_id: body.process_id(), + channel: try!(body.channel()).to_owned(), + payload: try!(body.message()).to_owned(), }) } val => return Ok(val), @@ -399,28 +405,30 @@ impl InnerConnection { try!(self.stream.write_message(|buf| frontend::password_message(&pass, buf))); try!(self.stream.flush()); } - backend::Message::AuthenticationMD5Password { salt } => { + backend::Message::AuthenticationMd5Password(body) => { let pass = try!(user.password.ok_or_else(|| { ConnectError::ConnectParams("a password was requested but not provided".into()) })); - let output = authentication::md5_hash(user.user.as_bytes(), pass.as_bytes(), salt); + let output = authentication::md5_hash(user.user.as_bytes(), + pass.as_bytes(), + body.salt()); try!(self.stream.write_message(|buf| frontend::password_message(&output, buf))); try!(self.stream.flush()); } backend::Message::AuthenticationKerberosV5 | - backend::Message::AuthenticationSCMCredential | - backend::Message::AuthenticationGSS | - backend::Message::AuthenticationSSPI => { + backend::Message::AuthenticationScmCredential | + backend::Message::AuthenticationGss | + backend::Message::AuthenticationSspi => { return Err(ConnectError::Io(io::Error::new(io::ErrorKind::Other, "unsupported authentication"))) } - backend::Message::ErrorResponse { fields } => return DbError::new_connect(fields), + backend::Message::ErrorResponse(body) => return DbError::new_connect(&mut body.fields()), _ => return Err(ConnectError::Io(bad_response())), } match try!(self.read_message()) { backend::Message::AuthenticationOk => Ok(()), - backend::Message::ErrorResponse { fields } => DbError::new_connect(fields), + backend::Message::ErrorResponse(body) => DbError::new_connect(&mut body.fields()), _ => Err(ConnectError::Io(bad_response())), } } @@ -439,35 +447,43 @@ impl InnerConnection { match try!(self.read_message()) { backend::Message::ParseComplete => {} - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { try!(self.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => bad_response!(self), } let raw_param_types = match try!(self.read_message()) { - backend::Message::ParameterDescription { types } => types, + backend::Message::ParameterDescription(body) => body, _ => bad_response!(self), }; let raw_columns = match try!(self.read_message()) { - backend::Message::RowDescription { descriptions } => descriptions, - backend::Message::NoData => vec![], + backend::Message::RowDescription(body) => Some(body), + backend::Message::NoData => None, _ => bad_response!(self), }; try!(self.wait_for_ready()); - let mut param_types = vec![]; - for oid in raw_param_types { - param_types.push(try!(self.get_type(oid))); - } - - let mut columns = vec![]; - for RowDescriptionEntry { name, type_oid, .. } in raw_columns { - columns.push(Column::new(name, try!(self.get_type(type_oid)))); - } + let param_types = try!(raw_param_types + .parameters() + .map_err(Into::into) + .and_then(|oid| self.get_type(oid)) + .collect()); + + let columns = match raw_columns { + Some(body) => { + try!(body.fields() + .and_then(|field| { + Ok(Column::new(field.name().to_owned(), + try!(self.get_type(field.type_oid())))) + }) + .collect()) + } + None => vec![], + }; Ok((param_types, columns)) } @@ -477,7 +493,7 @@ impl InnerConnection { loop { match try!(self.read_message()) { backend::Message::EmptyQueryResponse | - backend::Message::CommandComplete { .. } => { + backend::Message::CommandComplete(_) => { more_rows = false; break; } @@ -485,12 +501,15 @@ impl InnerConnection { more_rows = true; break; } - backend::Message::DataRow { row } => buf.push_back(row), - backend::Message::ErrorResponse { fields } => { - try!(self.wait_for_ready()); - return DbError::new(fields); + backend::Message::DataRow(body) => { + let row = try!(body.values().map(|v| v.map(ToOwned::to_owned)).collect()); + buf.push_back(row); } - backend::Message::CopyInResponse { .. } => { + backend::Message::ErrorResponse(body) => { + try!(self.wait_for_ready()); + return DbError::new(&mut body.fields()); + } + backend::Message::CopyInResponse(_) => { try!(self.stream.write_message(|buf| { frontend::copy_fail("COPY queries cannot be directly executed", buf) })); @@ -498,9 +517,9 @@ impl InnerConnection { .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(self.stream.flush()); } - backend::Message::CopyOutResponse { .. } => { + backend::Message::CopyOutResponse(_) => { loop { - if let backend::Message::ReadyForQuery { .. } = try!(self.read_message()) { + if let backend::Message::ReadyForQuery(_) = try!(self.read_message()) { break; } } @@ -563,9 +582,9 @@ impl InnerConnection { match try!(self.read_message()) { backend::Message::BindComplete => Ok(()), - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { try!(self.wait_for_ready()); - DbError::new(fields) + DbError::new(&mut body.fields()) } _ => { self.desynchronized = true; @@ -618,7 +637,7 @@ impl InnerConnection { try!(self.stream.flush()); let resp = match try!(self.read_message()) { backend::Message::CloseComplete => Ok(()), - backend::Message::ErrorResponse { fields } => DbError::new(fields), + backend::Message::ErrorResponse(body) => DbError::new(&mut body.fields()), _ => bad_response!(self), }; try!(self.wait_for_ready()); @@ -813,7 +832,7 @@ impl InnerConnection { #[allow(needless_return)] fn wait_for_ready(&mut self) -> Result<()> { match try!(self.read_message()) { - backend::Message::ReadyForQuery { .. } => Ok(()), + backend::Message::ReadyForQuery(_) => Ok(()), _ => bad_response!(self), } } @@ -827,13 +846,14 @@ impl InnerConnection { let mut result = vec![]; loop { match try!(self.read_message()) { - backend::Message::ReadyForQuery { .. } => break, - backend::Message::DataRow { row } => { - result.push(row.into_iter() - .map(|opt| opt.map(|b| String::from_utf8_lossy(&b).into_owned())) + backend::Message::ReadyForQuery(_) => break, + backend::Message::DataRow(body) => { + let row = try!(body.values() + .map(|v| v.map(|v| String::from_utf8_lossy(v).into_owned())) .collect()); + result.push(row); } - backend::Message::CopyInResponse { .. } => { + backend::Message::CopyInResponse(_) => { try!(self.stream.write_message(|buf| { frontend::copy_fail("COPY queries cannot be directly executed", buf) })); @@ -841,9 +861,9 @@ impl InnerConnection { .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(self.stream.flush()); } - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { try!(self.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => {} } @@ -1314,9 +1334,9 @@ trait OtherNew { } trait DbErrorNew { - fn new_raw(fields: Vec<(u8, String)>) -> result::Result; - fn new_connect(fields: Vec<(u8, String)>) -> result::Result; - fn new(fields: Vec<(u8, String)>) -> Result; + fn new_raw(fields: &mut ErrorFields) -> io::Result; + fn new_connect(fields: &mut ErrorFields) -> result::Result; + fn new(fields: &mut ErrorFields) -> Result; } trait RowsNew<'a> { diff --git a/src/notification.rs b/src/notification.rs index 1fe73b5c..ed5a692d 100644 --- a/src/notification.rs +++ b/src/notification.rs @@ -113,11 +113,11 @@ impl<'a> FallibleIterator for Iter<'a> { } match conn.read_message_with_notification_nonblocking() { - Ok(Some(backend::Message::NotificationResponse { process_id, channel, payload })) => { + Ok(Some(backend::Message::NotificationResponse(body))) => { Ok(Some(Notification { - process_id: process_id, - channel: channel, - payload: payload, + process_id: body.process_id(), + channel: try!(body.channel()).to_owned(), + payload: try!(body.message()).to_owned(), })) } Ok(None) => Ok(None), @@ -152,11 +152,11 @@ impl<'a> FallibleIterator for BlockingIter<'a> { } match conn.read_message_with_notification() { - Ok(backend::Message::NotificationResponse { process_id, channel, payload }) => { + Ok(backend::Message::NotificationResponse(body)) => { Ok(Some(Notification { - process_id: process_id, - channel: channel, - payload: payload, + process_id: body.process_id(), + channel: try!(body.channel()).to_owned(), + payload: try!(body.message()).to_owned(), })) } Err(err) => Err(Error::Io(err)), @@ -188,11 +188,11 @@ impl<'a> FallibleIterator for TimeoutIter<'a> { } match conn.read_message_with_notification_timeout(self.timeout) { - Ok(Some(backend::Message::NotificationResponse { process_id, channel, payload })) => { + Ok(Some(backend::Message::NotificationResponse(body))) => { Ok(Some(Notification { - process_id: process_id, - channel: channel, - payload: payload, + process_id: body.process_id(), + channel: try!(body.channel()).to_owned(), + payload: try!(body.message()).to_owned(), })) } Ok(None) => Ok(None), diff --git a/src/priv_io.rs b/src/priv_io.rs index 64e81c2a..d6b8c375 100644 --- a/src/priv_io.rs +++ b/src/priv_io.rs @@ -47,12 +47,12 @@ impl MessageStream { self.stream.write_all(&self.buf).map_err(From::from) } - fn inner_read_message(&mut self, b: u8) -> io::Result { + fn inner_read_message(&mut self, b: u8) -> io::Result>> { self.buf.resize(MESSAGE_HEADER_SIZE, 0); self.buf[0] = b; try!(self.stream.read_exact(&mut self.buf[1..])); - let len = match try!(backend::Message::parse(&self.buf)) { + let len = match try!(backend::Message::parse_owned(&self.buf)) { ParseResult::Complete { message, .. } => return Ok(message), ParseResult::Incomplete { required_size } => Some(required_size.unwrap()), }; @@ -62,13 +62,13 @@ impl MessageStream { try!(self.stream.read_exact(&mut self.buf[MESSAGE_HEADER_SIZE..])); }; - match try!(backend::Message::parse(&self.buf)) { + match try!(backend::Message::parse_owned(&self.buf)) { ParseResult::Complete { message, .. } => Ok(message), ParseResult::Incomplete { .. } => unreachable!(), } } - pub fn read_message(&mut self) -> io::Result { + pub fn read_message(&mut self) -> io::Result>> { let mut b = [0; 1]; try!(self.stream.read_exact(&mut b)); self.inner_read_message(b[0]) @@ -76,7 +76,7 @@ impl MessageStream { pub fn read_message_timeout(&mut self, timeout: Duration) - -> io::Result> { + -> io::Result>>> { try!(self.set_read_timeout(Some(timeout))); let mut b = [0; 1]; let r = self.stream.read_exact(&mut b); @@ -90,7 +90,7 @@ impl MessageStream { } } - pub fn read_message_nonblocking(&mut self) -> io::Result> { + pub fn read_message_nonblocking(&mut self) -> io::Result>>> { try!(self.set_nonblocking(true)); let mut b = [0; 1]; let r = self.stream.read_exact(&mut b); diff --git a/src/stmt.rs b/src/stmt.rs index ff2ac50e..177d1210 100644 --- a/src/stmt.rs +++ b/src/stmt.rs @@ -1,5 +1,6 @@ //! Prepared statements +use fallible_iterator::FallibleIterator; use std::cell::{Cell, RefMut}; use std::collections::VecDeque; use std::fmt; @@ -132,20 +133,20 @@ impl<'conn> Statement<'conn> { let num; loop { match try!(conn.read_message()) { - backend::Message::DataRow { .. } => {} - backend::Message::ErrorResponse { fields } => { + backend::Message::DataRow(_) => {} + backend::Message::ErrorResponse(body) => { try!(conn.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } - backend::Message::CommandComplete { tag } => { - num = parse_update_count(tag); + backend::Message::CommandComplete(body) => { + num = parse_update_count(try!(body.tag())); break; } backend::Message::EmptyQueryResponse => { num = 0; break; } - backend::Message::CopyInResponse { .. } => { + backend::Message::CopyInResponse(_) => { try!(conn.stream.write_message(|buf| { frontend::copy_fail("COPY queries cannot be directly executed", buf) })); @@ -153,13 +154,13 @@ impl<'conn> Statement<'conn> { .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(conn.stream.flush()); } - backend::Message::CopyOutResponse { .. } => { + backend::Message::CopyOutResponse(_) => { loop { match try!(conn.read_message()) { backend::Message::CopyDone => break, - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { try!(conn.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => {} } @@ -269,14 +270,20 @@ impl<'conn> Statement<'conn> { try!(conn.raw_execute(&self.info.name, "", 0, self.param_types(), params)); let (format, column_formats) = match try!(conn.read_message()) { - backend::Message::CopyInResponse { format, column_formats } => (format, column_formats), - backend::Message::ErrorResponse { fields } => { + backend::Message::CopyInResponse(body) => { + let format = body.format(); + let column_formats = try!(body.column_formats() + .map(|f| Format::from_u16(f)) + .collect()); + (format, column_formats) + } + backend::Message::ErrorResponse(body) => { try!(conn.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => { loop { - if let backend::Message::ReadyForQuery { .. } = try!(conn.read_message()) { + if let backend::Message::ReadyForQuery(_) = try!(conn.read_message()) { return Err(Error::Io(io::Error::new(io::ErrorKind::InvalidInput, "called `copy_in` on a \ non-`COPY FROM STDIN` \ @@ -289,7 +296,7 @@ impl<'conn> Statement<'conn> { let mut info = CopyInfo { conn: conn, format: Format::from_u16(format as u16), - column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(), + column_formats: column_formats, }; let mut buf = [0; 16 * 1024]; @@ -311,7 +318,7 @@ impl<'conn> Statement<'conn> { .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(info.conn.stream.flush()); match try!(info.conn.read_message()) { - backend::Message::ErrorResponse { .. } => { + backend::Message::ErrorResponse(_) => { // expected from the CopyFail } _ => { @@ -330,10 +337,10 @@ impl<'conn> Statement<'conn> { try!(info.conn.stream.flush()); let num = match try!(info.conn.read_message()) { - backend::Message::CommandComplete { tag } => parse_update_count(tag), - backend::Message::ErrorResponse { fields } => { + backend::Message::CommandComplete(body) => parse_update_count(try!(body.tag())), + backend::Message::ErrorResponse(body) => { try!(info.conn.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => { info.conn.desynchronized = true; @@ -372,17 +379,21 @@ impl<'conn> Statement<'conn> { try!(conn.raw_execute(&self.info.name, "", 0, self.param_types(), params)); let (format, column_formats) = match try!(conn.read_message()) { - backend::Message::CopyOutResponse { format, column_formats } => { + backend::Message::CopyOutResponse(body) => { + let format = body.format(); + let column_formats = try!(body.column_formats() + .map(|f| Format::from_u16(f)) + .collect()); (format, column_formats) } - backend::Message::CopyInResponse { .. } => { + backend::Message::CopyInResponse(_) => { try!(conn.stream.write_message(|buf| frontend::copy_fail("", buf))); try!(conn.stream .write_message(|buf| Ok::<(), io::Error>(frontend::copy_done(buf)))); try!(conn.stream.write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(conn.stream.flush()); match try!(conn.read_message()) { - backend::Message::ErrorResponse { .. } => { + backend::Message::ErrorResponse(_) => { // expected from the CopyFail } _ => { @@ -395,13 +406,13 @@ impl<'conn> Statement<'conn> { "called `copy_out` on a non-`COPY TO \ STDOUT` statement"))); } - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { try!(conn.wait_for_ready()); - return DbError::new(fields); + return DbError::new(&mut body.fields()); } _ => { loop { - if let backend::Message::ReadyForQuery { .. } = try!(conn.read_message()) { + if let backend::Message::ReadyForQuery(_) = try!(conn.read_message()) { return Err(Error::Io(io::Error::new(io::ErrorKind::InvalidInput, "called `copy_out` on a \ non-`COPY TO STDOUT` statement"))); @@ -413,20 +424,20 @@ impl<'conn> Statement<'conn> { let mut info = CopyInfo { conn: conn, format: Format::from_u16(format as u16), - column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(), + column_formats: column_formats, }; let count; loop { match try!(info.conn.read_message()) { - backend::Message::CopyData { data } => { - let mut data = &data[..]; + backend::Message::CopyData(body) => { + let mut data = body.data(); while !data.is_empty() { match w.write_with_info(data, &info) { Ok(n) => data = &data[n..], Err(e) => { loop { - if let backend::Message::ReadyForQuery { .. } = + if let backend::Message::ReadyForQuery(_) = try!(info.conn.read_message()) { return Err(Error::Io(e)); } @@ -436,21 +447,21 @@ impl<'conn> Statement<'conn> { } } backend::Message::CopyDone => {} - backend::Message::CommandComplete { tag } => { - count = parse_update_count(tag); + backend::Message::CommandComplete(body) => { + count = parse_update_count(try!(body.tag())); break; } - backend::Message::ErrorResponse { fields } => { + backend::Message::ErrorResponse(body) => { loop { - if let backend::Message::ReadyForQuery { .. } = + if let backend::Message::ReadyForQuery(_) = try!(info.conn.read_message()) { - return DbError::new(fields); + return DbError::new(&mut body.fields()); } } } _ => { loop { - if let backend::Message::ReadyForQuery { .. } = + if let backend::Message::ReadyForQuery(_) = try!(info.conn.read_message()) { return Err(Error::Io(bad_response())); } @@ -586,6 +597,6 @@ impl Format { } } -fn parse_update_count(tag: String) -> u64 { +fn parse_update_count(tag: &str) -> u64 { tag.split(' ').last().unwrap().parse().unwrap_or(0) }