Use protocol's backend enum
This commit is contained in:
parent
fbcdd6b547
commit
ff853811cb
95
src/lib.rs
95
src/lib.rs
@ -61,11 +61,11 @@ use std::mem;
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use postgres_protocol::message::backend::{self, RowDescriptionEntry};
|
||||
use postgres_protocol::message::frontend;
|
||||
|
||||
use error::{Error, ConnectError, SqlState, DbError};
|
||||
use io::TlsHandshake;
|
||||
use message::{Backend, RowDescriptionEntry};
|
||||
use notification::{Notifications, Notification};
|
||||
use params::{ConnectParams, IntoConnectParams, UserInfo};
|
||||
use priv_io::MessageStream;
|
||||
@ -79,7 +79,6 @@ mod macros;
|
||||
|
||||
mod feature_check;
|
||||
mod md5;
|
||||
mod message;
|
||||
mod priv_io;
|
||||
mod url;
|
||||
pub mod error;
|
||||
@ -289,12 +288,12 @@ impl InnerConnection {
|
||||
|
||||
loop {
|
||||
match try!(conn.read_message()) {
|
||||
Backend::BackendKeyData { process_id, secret_key } => {
|
||||
backend::Message::BackendKeyData { process_id, secret_key } => {
|
||||
conn.cancel_data.process_id = process_id;
|
||||
conn.cancel_data.secret_key = secret_key;
|
||||
}
|
||||
Backend::ReadyForQuery { .. } => break,
|
||||
Backend::ErrorResponse { fields } => return DbError::new_connect(fields),
|
||||
backend::Message::ReadyForQuery { .. } => break,
|
||||
backend::Message::ErrorResponse { fields } => return DbError::new_connect(fields),
|
||||
_ => return Err(ConnectError::Io(bad_response())),
|
||||
}
|
||||
}
|
||||
@ -302,16 +301,16 @@ impl InnerConnection {
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn read_message_with_notification(&mut self) -> std_io::Result<Backend> {
|
||||
fn read_message_with_notification(&mut self) -> std_io::Result<backend::Message> {
|
||||
debug_assert!(!self.desynchronized);
|
||||
loop {
|
||||
match try_desync!(self, self.stream.read_message()) {
|
||||
Backend::NoticeResponse { fields } => {
|
||||
backend::Message::NoticeResponse { fields } => {
|
||||
if let Ok(err) = DbError::new_raw(fields) {
|
||||
self.notice_handler.handle_notice(err);
|
||||
}
|
||||
}
|
||||
Backend::ParameterStatus { parameter, value } => {
|
||||
backend::Message::ParameterStatus { parameter, value } => {
|
||||
self.parameters.insert(parameter, value);
|
||||
}
|
||||
val => return Ok(val),
|
||||
@ -321,16 +320,16 @@ impl InnerConnection {
|
||||
|
||||
fn read_message_with_notification_timeout(&mut self,
|
||||
timeout: Duration)
|
||||
-> std::io::Result<Option<Backend>> {
|
||||
-> std::io::Result<Option<backend::Message>> {
|
||||
debug_assert!(!self.desynchronized);
|
||||
loop {
|
||||
match try_desync!(self, self.stream.read_message_timeout(timeout)) {
|
||||
Some(Backend::NoticeResponse { fields }) => {
|
||||
Some(backend::Message::NoticeResponse { fields }) => {
|
||||
if let Ok(err) = DbError::new_raw(fields) {
|
||||
self.notice_handler.handle_notice(err);
|
||||
}
|
||||
}
|
||||
Some(Backend::ParameterStatus { parameter, value }) => {
|
||||
Some(backend::Message::ParameterStatus { parameter, value }) => {
|
||||
self.parameters.insert(parameter, value);
|
||||
}
|
||||
val => return Ok(val),
|
||||
@ -338,16 +337,16 @@ impl InnerConnection {
|
||||
}
|
||||
}
|
||||
|
||||
fn read_message_with_notification_nonblocking(&mut self) -> std::io::Result<Option<Backend>> {
|
||||
fn read_message_with_notification_nonblocking(&mut self) -> std::io::Result<Option<backend::Message>> {
|
||||
debug_assert!(!self.desynchronized);
|
||||
loop {
|
||||
match try_desync!(self, self.stream.read_message_nonblocking()) {
|
||||
Some(Backend::NoticeResponse { fields }) => {
|
||||
Some(backend::Message::NoticeResponse { fields }) => {
|
||||
if let Ok(err) = DbError::new_raw(fields) {
|
||||
self.notice_handler.handle_notice(err);
|
||||
}
|
||||
}
|
||||
Some(Backend::ParameterStatus { parameter, value }) => {
|
||||
Some(backend::Message::ParameterStatus { parameter, value }) => {
|
||||
self.parameters.insert(parameter, value);
|
||||
}
|
||||
val => return Ok(val),
|
||||
@ -355,10 +354,10 @@ impl InnerConnection {
|
||||
}
|
||||
}
|
||||
|
||||
fn read_message(&mut self) -> std_io::Result<Backend> {
|
||||
fn read_message(&mut self) -> std_io::Result<backend::Message> {
|
||||
loop {
|
||||
match try!(self.read_message_with_notification()) {
|
||||
Backend::NotificationResponse { process_id, channel, payload } => {
|
||||
backend::Message::NotificationResponse { process_id, channel, payload } => {
|
||||
self.notifications.push_back(Notification {
|
||||
process_id: process_id,
|
||||
channel: channel,
|
||||
@ -372,15 +371,15 @@ impl InnerConnection {
|
||||
|
||||
fn handle_auth(&mut self, user: UserInfo) -> result::Result<(), ConnectError> {
|
||||
match try!(self.read_message()) {
|
||||
Backend::AuthenticationOk => return Ok(()),
|
||||
Backend::AuthenticationCleartextPassword => {
|
||||
backend::Message::AuthenticationOk => return Ok(()),
|
||||
backend::Message::AuthenticationCleartextPassword => {
|
||||
let pass = try!(user.password.ok_or_else(|| {
|
||||
ConnectError::ConnectParams("a password was requested but not provided".into())
|
||||
}));
|
||||
try!(self.stream.write_message(&frontend::PasswordMessage { password: &pass }));
|
||||
try!(self.stream.flush());
|
||||
}
|
||||
Backend::AuthenticationMD5Password { salt } => {
|
||||
backend::Message::AuthenticationMD5Password { salt } => {
|
||||
let pass = try!(user.password.ok_or_else(|| {
|
||||
ConnectError::ConnectParams("a password was requested but not provided".into())
|
||||
}));
|
||||
@ -395,20 +394,20 @@ impl InnerConnection {
|
||||
try!(self.stream.write_message(&frontend::PasswordMessage { password: &output }));
|
||||
try!(self.stream.flush());
|
||||
}
|
||||
Backend::AuthenticationKerberosV5 |
|
||||
Backend::AuthenticationSCMCredential |
|
||||
Backend::AuthenticationGSS |
|
||||
Backend::AuthenticationSSPI => {
|
||||
backend::Message::AuthenticationKerberosV5 |
|
||||
backend::Message::AuthenticationSCMCredential |
|
||||
backend::Message::AuthenticationGSS |
|
||||
backend::Message::AuthenticationSSPI => {
|
||||
return Err(ConnectError::Io(std_io::Error::new(std_io::ErrorKind::Other,
|
||||
"unsupported authentication")))
|
||||
}
|
||||
Backend::ErrorResponse { fields } => return DbError::new_connect(fields),
|
||||
backend::Message::ErrorResponse { fields } => return DbError::new_connect(fields),
|
||||
_ => return Err(ConnectError::Io(bad_response())),
|
||||
}
|
||||
|
||||
match try!(self.read_message()) {
|
||||
Backend::AuthenticationOk => Ok(()),
|
||||
Backend::ErrorResponse { fields } => DbError::new_connect(fields),
|
||||
backend::Message::AuthenticationOk => Ok(()),
|
||||
backend::Message::ErrorResponse { fields } => DbError::new_connect(fields),
|
||||
_ => Err(ConnectError::Io(bad_response())),
|
||||
}
|
||||
}
|
||||
@ -433,8 +432,8 @@ impl InnerConnection {
|
||||
try!(self.stream.flush());
|
||||
|
||||
match try!(self.read_message()) {
|
||||
Backend::ParseComplete => {}
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::ParseComplete => {}
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(self.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
@ -442,13 +441,13 @@ impl InnerConnection {
|
||||
}
|
||||
|
||||
let raw_param_types = match try!(self.read_message()) {
|
||||
Backend::ParameterDescription { types } => types,
|
||||
backend::Message::ParameterDescription { types } => types,
|
||||
_ => bad_response!(self),
|
||||
};
|
||||
|
||||
let raw_columns = match try!(self.read_message()) {
|
||||
Backend::RowDescription { descriptions } => descriptions,
|
||||
Backend::NoData => vec![],
|
||||
backend::Message::RowDescription { descriptions } => descriptions,
|
||||
backend::Message::NoData => vec![],
|
||||
_ => bad_response!(self),
|
||||
};
|
||||
|
||||
@ -471,30 +470,30 @@ impl InnerConnection {
|
||||
let more_rows;
|
||||
loop {
|
||||
match try!(self.read_message()) {
|
||||
Backend::EmptyQueryResponse |
|
||||
Backend::CommandComplete { .. } => {
|
||||
backend::Message::EmptyQueryResponse |
|
||||
backend::Message::CommandComplete { .. } => {
|
||||
more_rows = false;
|
||||
break;
|
||||
}
|
||||
Backend::PortalSuspended => {
|
||||
backend::Message::PortalSuspended => {
|
||||
more_rows = true;
|
||||
break;
|
||||
}
|
||||
Backend::DataRow { row } => buf.push_back(row),
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::DataRow { row } => buf.push_back(row),
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(self.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
Backend::CopyInResponse { .. } => {
|
||||
backend::Message::CopyInResponse { .. } => {
|
||||
try!(self.stream.write_message(&frontend::CopyFail {
|
||||
message: "COPY queries cannot be directly executed",
|
||||
}));
|
||||
try!(self.stream.write_message(&frontend::Sync));
|
||||
try!(self.stream.flush());
|
||||
}
|
||||
Backend::CopyOutResponse { .. } => {
|
||||
backend::Message::CopyOutResponse { .. } => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } = try!(self.read_message()) {
|
||||
if let backend::Message::ReadyForQuery { .. } = try!(self.read_message()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -550,8 +549,8 @@ impl InnerConnection {
|
||||
try!(self.stream.flush());
|
||||
|
||||
match try!(self.read_message()) {
|
||||
Backend::BindComplete => Ok(()),
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::BindComplete => Ok(()),
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(self.wait_for_ready());
|
||||
DbError::new(fields)
|
||||
}
|
||||
@ -608,8 +607,8 @@ impl InnerConnection {
|
||||
try!(self.stream.write_message(&frontend::Sync));
|
||||
try!(self.stream.flush());
|
||||
let resp = match try!(self.read_message()) {
|
||||
Backend::CloseComplete => Ok(()),
|
||||
Backend::ErrorResponse { fields } => DbError::new(fields),
|
||||
backend::Message::CloseComplete => Ok(()),
|
||||
backend::Message::ErrorResponse { fields } => DbError::new(fields),
|
||||
_ => bad_response!(self),
|
||||
};
|
||||
try!(self.wait_for_ready());
|
||||
@ -797,7 +796,7 @@ impl InnerConnection {
|
||||
#[allow(needless_return)]
|
||||
fn wait_for_ready(&mut self) -> Result<()> {
|
||||
match try!(self.read_message()) {
|
||||
Backend::ReadyForQuery { .. } => Ok(()),
|
||||
backend::Message::ReadyForQuery { .. } => Ok(()),
|
||||
_ => bad_response!(self),
|
||||
}
|
||||
}
|
||||
@ -811,20 +810,20 @@ impl InnerConnection {
|
||||
let mut result = vec![];
|
||||
loop {
|
||||
match try!(self.read_message()) {
|
||||
Backend::ReadyForQuery { .. } => break,
|
||||
Backend::DataRow { row } => {
|
||||
backend::Message::ReadyForQuery { .. } => break,
|
||||
backend::Message::DataRow { row } => {
|
||||
result.push(row.into_iter()
|
||||
.map(|opt| opt.map(|b| String::from_utf8_lossy(&b).into_owned()))
|
||||
.collect());
|
||||
}
|
||||
Backend::CopyInResponse { .. } => {
|
||||
backend::Message::CopyInResponse { .. } => {
|
||||
try!(self.stream.write_message(&frontend::CopyFail {
|
||||
message: "COPY queries cannot be directly executed",
|
||||
}));
|
||||
try!(self.stream.write_message(&frontend::Sync));
|
||||
try!(self.stream.flush());
|
||||
}
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(self.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
|
154
src/message.rs
154
src/message.rs
@ -1,154 +0,0 @@
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use postgres_protocol::message::backend::Message;
|
||||
use std::io;
|
||||
|
||||
use types::Oid;
|
||||
|
||||
pub enum Backend {
|
||||
AuthenticationCleartextPassword,
|
||||
AuthenticationGSS,
|
||||
AuthenticationKerberosV5,
|
||||
AuthenticationMD5Password { salt: [u8; 4] },
|
||||
AuthenticationOk,
|
||||
AuthenticationSCMCredential,
|
||||
AuthenticationSSPI,
|
||||
BackendKeyData { process_id: i32, secret_key: i32 },
|
||||
BindComplete,
|
||||
CloseComplete,
|
||||
CommandComplete { tag: String },
|
||||
CopyData { data: Vec<u8> },
|
||||
CopyDone,
|
||||
CopyInResponse {
|
||||
format: u8,
|
||||
column_formats: Vec<u16>,
|
||||
},
|
||||
CopyOutResponse {
|
||||
format: u8,
|
||||
column_formats: Vec<u16>,
|
||||
},
|
||||
DataRow { row: Vec<Option<Vec<u8>>> },
|
||||
EmptyQueryResponse,
|
||||
ErrorResponse { fields: Vec<(u8, String)> },
|
||||
NoData,
|
||||
NoticeResponse { fields: Vec<(u8, String)> },
|
||||
NotificationResponse {
|
||||
process_id: i32,
|
||||
channel: String,
|
||||
payload: String,
|
||||
},
|
||||
ParameterDescription { types: Vec<Oid> },
|
||||
ParameterStatus { parameter: String, value: String },
|
||||
ParseComplete,
|
||||
PortalSuspended,
|
||||
ReadyForQuery { _state: u8 },
|
||||
RowDescription { descriptions: Vec<RowDescriptionEntry>, },
|
||||
}
|
||||
|
||||
impl Backend {
|
||||
pub fn convert(message: Message) -> io::Result<Backend> {
|
||||
let ret = match message {
|
||||
Message::AuthenticationCleartextPassword => Backend::AuthenticationCleartextPassword,
|
||||
Message::AuthenticationGss => Backend::AuthenticationGSS,
|
||||
Message::AuthenticationKerberosV5 => Backend::AuthenticationKerberosV5,
|
||||
Message::AuthenticationMd55Password(body) => {
|
||||
Backend::AuthenticationMD5Password { salt: body.salt() }
|
||||
}
|
||||
Message::AuthenticationOk => Backend::AuthenticationOk,
|
||||
Message::AuthenticationScmCredential => Backend::AuthenticationSCMCredential,
|
||||
Message::AuthenticationSspi => Backend::AuthenticationSSPI,
|
||||
Message::BackendKeyData(body) => {
|
||||
Backend::BackendKeyData {
|
||||
process_id: body.process_id(),
|
||||
secret_key: body.secret_key(),
|
||||
}
|
||||
}
|
||||
Message::BindComplete => Backend::BindComplete,
|
||||
Message::CloseComplete => Backend::CloseComplete,
|
||||
Message::CommandComplete(body) => {
|
||||
Backend::CommandComplete { tag: body.tag().to_owned() }
|
||||
}
|
||||
Message::CopyData(body) => Backend::CopyData { data: body.data().to_owned() },
|
||||
Message::CopyDone => Backend::CopyDone,
|
||||
Message::CopyInResponse(body) => {
|
||||
Backend::CopyInResponse {
|
||||
format: body.format(),
|
||||
column_formats: try!(body.column_formats().collect()),
|
||||
}
|
||||
}
|
||||
Message::CopyOutResponse(body) => {
|
||||
Backend::CopyOutResponse {
|
||||
format: body.format(),
|
||||
column_formats: try!(body.column_formats().collect()),
|
||||
}
|
||||
}
|
||||
Message::DataRow(body) => {
|
||||
Backend::DataRow {
|
||||
row: try!(body.values().map(|r| r.map(|d| d.to_owned())).collect()),
|
||||
}
|
||||
}
|
||||
Message::EmptyQueryResponse => Backend::EmptyQueryResponse,
|
||||
Message::ErrorResponse(body) => {
|
||||
Backend::ErrorResponse {
|
||||
fields: try!(body.fields()
|
||||
.map(|f| (f.type_(), f.value().to_owned()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
Message::NoData => Backend::NoData,
|
||||
Message::NoticeResponse(body) => {
|
||||
Backend::NoticeResponse {
|
||||
fields: try!(body.fields()
|
||||
.map(|f| (f.type_(), f.value().to_owned()))
|
||||
.collect()),
|
||||
}
|
||||
}
|
||||
Message::NotificationResponse(body) => {
|
||||
Backend::NotificationResponse {
|
||||
process_id: body.process_id(),
|
||||
channel: body.channel().to_owned(),
|
||||
payload: body.message().to_owned(),
|
||||
}
|
||||
}
|
||||
Message::ParameterDescription(body) => {
|
||||
Backend::ParameterDescription { types: try!(body.parameters().collect()) }
|
||||
}
|
||||
Message::ParameterStatus(body) => {
|
||||
Backend::ParameterStatus {
|
||||
parameter: body.name().to_owned(),
|
||||
value: body.value().to_owned(),
|
||||
}
|
||||
}
|
||||
Message::ParseComplete => Backend::ParseComplete,
|
||||
Message::PortalSuspended => Backend::PortalSuspended,
|
||||
Message::ReadyForQuery(body) => Backend::ReadyForQuery { _state: body.status() },
|
||||
Message::RowDescription(body) => {
|
||||
let fields = body.fields()
|
||||
.map(|f| {
|
||||
RowDescriptionEntry {
|
||||
name: f.name().to_owned(),
|
||||
table_oid: f.table_oid(),
|
||||
column_id: f.column_id(),
|
||||
type_oid: f.type_oid(),
|
||||
type_size: f.type_size(),
|
||||
type_modifier: f.type_modifier(),
|
||||
format: f.format(),
|
||||
}
|
||||
});
|
||||
Backend::RowDescription { descriptions: try!(fields.collect()) }
|
||||
}
|
||||
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "unknown message type")),
|
||||
};
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RowDescriptionEntry {
|
||||
pub name: String,
|
||||
pub table_oid: Oid,
|
||||
pub column_id: i16,
|
||||
pub type_oid: Oid,
|
||||
pub type_size: i16,
|
||||
pub type_modifier: i32,
|
||||
pub format: i16,
|
||||
}
|
@ -3,9 +3,9 @@
|
||||
use fallible_iterator::{FallibleIterator, IntoFallibleIterator};
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
use postgres_protocol::message::backend;
|
||||
|
||||
use {desynchronized, Result, Connection, NotificationsNew};
|
||||
use message::Backend;
|
||||
use error::Error;
|
||||
|
||||
/// An asynchronous notification.
|
||||
@ -113,7 +113,7 @@ impl<'a> FallibleIterator for Iter<'a> {
|
||||
}
|
||||
|
||||
match conn.read_message_with_notification_nonblocking() {
|
||||
Ok(Some(Backend::NotificationResponse { process_id, channel, payload })) => {
|
||||
Ok(Some(backend::Message::NotificationResponse { process_id, channel, payload })) => {
|
||||
Ok(Some(Notification {
|
||||
process_id: process_id,
|
||||
channel: channel,
|
||||
@ -152,7 +152,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
|
||||
}
|
||||
|
||||
match conn.read_message_with_notification() {
|
||||
Ok(Backend::NotificationResponse { process_id, channel, payload }) => {
|
||||
Ok(backend::Message::NotificationResponse { process_id, channel, payload }) => {
|
||||
Ok(Some(Notification {
|
||||
process_id: process_id,
|
||||
channel: channel,
|
||||
@ -188,7 +188,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
|
||||
}
|
||||
|
||||
match conn.read_message_with_notification_timeout(self.timeout) {
|
||||
Ok(Some(Backend::NotificationResponse { process_id, channel, payload })) => {
|
||||
Ok(Some(backend::Message::NotificationResponse { process_id, channel, payload })) => {
|
||||
Ok(Some(Notification {
|
||||
process_id: process_id,
|
||||
channel: channel,
|
||||
|
@ -17,7 +17,6 @@ use postgres_protocol::message::backend::{self, ParseResult};
|
||||
use TlsMode;
|
||||
use error::ConnectError;
|
||||
use io::TlsStream;
|
||||
use message::Backend;
|
||||
use params::{ConnectParams, ConnectTarget};
|
||||
|
||||
const DEFAULT_PORT: u16 = 5432;
|
||||
@ -46,14 +45,13 @@ impl MessageStream {
|
||||
self.stream.write_all(&self.buf)
|
||||
}
|
||||
|
||||
fn raw_read_message<'a>(&'a mut self, b: u8) -> io::Result<backend::Message<'a>> {
|
||||
fn inner_read_message(&mut self, b: u8) -> io::Result<backend::Message> {
|
||||
self.buf.resize(MESSAGE_HEADER_SIZE, 0);
|
||||
self.buf[0] = b;
|
||||
try!(self.stream.read_exact(&mut self.buf[1..]));
|
||||
|
||||
let len = match try!(backend::Message::parse(&self.buf)) {
|
||||
// FIXME this is dumb but an explicit return runs into borrowck issues :(
|
||||
ParseResult::Complete { .. } => None,
|
||||
ParseResult::Complete { message, .. } => return Ok(message),
|
||||
ParseResult::Incomplete { required_size } => Some(required_size.unwrap()),
|
||||
};
|
||||
|
||||
@ -68,17 +66,14 @@ impl MessageStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn inner_read_message(&mut self, b: u8) -> io::Result<Backend> {
|
||||
let message = try!(self.raw_read_message(b));
|
||||
Backend::convert(message)
|
||||
}
|
||||
|
||||
pub fn read_message(&mut self) -> io::Result<Backend> {
|
||||
pub fn read_message(&mut self) -> io::Result<backend::Message> {
|
||||
let b = try!(self.stream.read_u8());
|
||||
self.inner_read_message(b)
|
||||
}
|
||||
|
||||
pub fn read_message_timeout(&mut self, timeout: Duration) -> io::Result<Option<Backend>> {
|
||||
pub fn read_message_timeout(&mut self,
|
||||
timeout: Duration)
|
||||
-> io::Result<Option<backend::Message>> {
|
||||
try!(self.set_read_timeout(Some(timeout)));
|
||||
let b = self.stream.read_u8();
|
||||
try!(self.set_read_timeout(None));
|
||||
@ -91,7 +86,7 @@ impl MessageStream {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_message_nonblocking(&mut self) -> io::Result<Option<Backend>> {
|
||||
pub fn read_message_nonblocking(&mut self) -> io::Result<Option<backend::Message>> {
|
||||
try!(self.set_nonblocking(true));
|
||||
let b = self.stream.read_u8();
|
||||
try!(self.set_nonblocking(false));
|
||||
|
55
src/stmt.rs
55
src/stmt.rs
@ -5,11 +5,10 @@ use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::sync::Arc;
|
||||
use postgres_protocol::message::frontend;
|
||||
use postgres_protocol::message::{backend, frontend};
|
||||
|
||||
use error::{Error, DbError};
|
||||
use types::{SessionInfo, Type, ToSql};
|
||||
use message::Backend;
|
||||
use rows::{Rows, LazyRows};
|
||||
use transaction::Transaction;
|
||||
use {bad_response, Connection, StatementInternals, Result, RowsNew, InnerConnection,
|
||||
@ -133,31 +132,31 @@ impl<'conn> Statement<'conn> {
|
||||
let num;
|
||||
loop {
|
||||
match try!(conn.read_message()) {
|
||||
Backend::DataRow { .. } => {}
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::DataRow { .. } => {}
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(conn.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
Backend::CommandComplete { tag } => {
|
||||
backend::Message::CommandComplete { tag } => {
|
||||
num = parse_update_count(tag);
|
||||
break;
|
||||
}
|
||||
Backend::EmptyQueryResponse => {
|
||||
backend::Message::EmptyQueryResponse => {
|
||||
num = 0;
|
||||
break;
|
||||
}
|
||||
Backend::CopyInResponse { .. } => {
|
||||
backend::Message::CopyInResponse { .. } => {
|
||||
try!(conn.stream.write_message(&frontend::CopyFail {
|
||||
message: "COPY queries cannot be directly executed",
|
||||
}));
|
||||
try!(conn.stream.write_message(&frontend::Sync));
|
||||
try!(conn.stream.flush());
|
||||
}
|
||||
Backend::CopyOutResponse { .. } => {
|
||||
backend::Message::CopyOutResponse { .. } => {
|
||||
loop {
|
||||
match try!(conn.read_message()) {
|
||||
Backend::CopyDone => break,
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::CopyDone => break,
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(conn.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
@ -269,14 +268,14 @@ impl<'conn> Statement<'conn> {
|
||||
try!(conn.raw_execute(&self.info.name, "", 0, self.param_types(), params));
|
||||
|
||||
let (format, column_formats) = match try!(conn.read_message()) {
|
||||
Backend::CopyInResponse { format, column_formats } => (format, column_formats),
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::CopyInResponse { format, column_formats } => (format, column_formats),
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(conn.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
_ => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } = try!(conn.read_message()) {
|
||||
if let backend::Message::ReadyForQuery { .. } = try!(conn.read_message()) {
|
||||
return Err(Error::Io(io::Error::new(io::ErrorKind::InvalidInput,
|
||||
"called `copy_in` on a \
|
||||
non-`COPY FROM STDIN` \
|
||||
@ -305,7 +304,7 @@ impl<'conn> Statement<'conn> {
|
||||
try!(info.conn.stream.write_message(&frontend::Sync));
|
||||
try!(info.conn.stream.flush());
|
||||
match try!(info.conn.read_message()) {
|
||||
Backend::ErrorResponse { .. } => {
|
||||
backend::Message::ErrorResponse { .. } => {
|
||||
// expected from the CopyFail
|
||||
}
|
||||
_ => {
|
||||
@ -324,8 +323,8 @@ impl<'conn> Statement<'conn> {
|
||||
try!(info.conn.stream.flush());
|
||||
|
||||
let num = match try!(info.conn.read_message()) {
|
||||
Backend::CommandComplete { tag } => parse_update_count(tag),
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::CommandComplete { tag } => parse_update_count(tag),
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(info.conn.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
@ -366,14 +365,14 @@ impl<'conn> Statement<'conn> {
|
||||
try!(conn.raw_execute(&self.info.name, "", 0, self.param_types(), params));
|
||||
|
||||
let (format, column_formats) = match try!(conn.read_message()) {
|
||||
Backend::CopyOutResponse { format, column_formats } => (format, column_formats),
|
||||
Backend::CopyInResponse { .. } => {
|
||||
backend::Message::CopyOutResponse { format, column_formats } => (format, column_formats),
|
||||
backend::Message::CopyInResponse { .. } => {
|
||||
try!(conn.stream.write_message(&frontend::CopyFail { message: "" }));
|
||||
try!(conn.stream.write_message(&frontend::CopyDone));
|
||||
try!(conn.stream.write_message(&frontend::Sync));
|
||||
try!(conn.stream.flush());
|
||||
match try!(conn.read_message()) {
|
||||
Backend::ErrorResponse { .. } => {
|
||||
backend::Message::ErrorResponse { .. } => {
|
||||
// expected from the CopyFail
|
||||
}
|
||||
_ => {
|
||||
@ -386,13 +385,13 @@ impl<'conn> Statement<'conn> {
|
||||
"called `copy_out` on a non-`COPY TO \
|
||||
STDOUT` statement")));
|
||||
}
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
try!(conn.wait_for_ready());
|
||||
return DbError::new(fields);
|
||||
}
|
||||
_ => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } = try!(conn.read_message()) {
|
||||
if let backend::Message::ReadyForQuery { .. } = try!(conn.read_message()) {
|
||||
return Err(Error::Io(io::Error::new(io::ErrorKind::InvalidInput,
|
||||
"called `copy_out` on a \
|
||||
non-`COPY TO STDOUT` statement")));
|
||||
@ -410,14 +409,14 @@ impl<'conn> Statement<'conn> {
|
||||
let count;
|
||||
loop {
|
||||
match try!(info.conn.read_message()) {
|
||||
Backend::CopyData { data } => {
|
||||
backend::Message::CopyData { data } => {
|
||||
let mut data = &data[..];
|
||||
while !data.is_empty() {
|
||||
match w.write_with_info(data, &info) {
|
||||
Ok(n) => data = &data[n..],
|
||||
Err(e) => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } =
|
||||
if let backend::Message::ReadyForQuery { .. } =
|
||||
try!(info.conn.read_message()) {
|
||||
return Err(Error::Io(e));
|
||||
}
|
||||
@ -426,21 +425,21 @@ impl<'conn> Statement<'conn> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Backend::CopyDone => {}
|
||||
Backend::CommandComplete { tag } => {
|
||||
backend::Message::CopyDone => {}
|
||||
backend::Message::CommandComplete { tag } => {
|
||||
count = parse_update_count(tag);
|
||||
break;
|
||||
}
|
||||
Backend::ErrorResponse { fields } => {
|
||||
backend::Message::ErrorResponse { fields } => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } = try!(info.conn.read_message()) {
|
||||
if let backend::Message::ReadyForQuery { .. } = try!(info.conn.read_message()) {
|
||||
return DbError::new(fields);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
loop {
|
||||
if let Backend::ReadyForQuery { .. } = try!(info.conn.read_message()) {
|
||||
if let backend::Message::ReadyForQuery { .. } = try!(info.conn.read_message()) {
|
||||
return Err(Error::Io(bad_response()));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user