Update for postgres_protocol changes

This commit is contained in:
Steven Fackler 2016-09-19 23:46:25 -04:00
parent b559b98b00
commit 16ce48a510
5 changed files with 49 additions and 46 deletions

View File

@ -165,13 +165,8 @@ pub fn cancel_query<T>(params: T,
let params = try!(params.into_connect_params().map_err(ConnectError::ConnectParams));
let mut socket = try!(priv_io::initialize_stream(&params, tls));
let message = frontend::CancelRequest {
process_id: data.process_id,
secret_key: data.secret_key,
};
let mut buf = vec![];
try!(frontend::Message::write(&message, &mut buf));
frontend::cancel_request(data.process_id, data.secret_key, &mut buf);
try!(socket.write_all(&buf));
try!(socket.flush());
@ -516,23 +511,31 @@ impl InnerConnection {
debug!("executing statement {} with parameters: {:?}",
stmt_name,
params);
let mut values = vec![];
for (param, ty) in params.iter().zip(param_types) {
let mut buf = vec![];
match try!(param.to_sql_checked(ty, &mut buf, &SessionInfo::new(self))
.map_err(Error::Conversion)) {
IsNull::Yes => values.push(None),
IsNull::No => values.push(Some(buf)),
{
let info = SessionInfo::new(&self.parameters);
let r = self.stream.write_message2(|buf| {
frontend::bind(portal_name,
&stmt_name,
Some(1),
params.iter().zip(param_types),
|(param, ty), buf| {
match param.to_sql_checked(ty, buf, &info) {
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Err(e) => Err(e),
}
},
Some(1),
buf)
});
match r {
Ok(()) => {}
Err(frontend::BindError::Conversion(e)) => return Err(Error::Conversion(e)),
Err(frontend::BindError::Serialization(e)) => return Err(Error::Io(e)),
}
}
try!(self.stream.write_message(&frontend::Bind {
portal: portal_name,
statement: &stmt_name,
formats: &[1],
values: &values,
result_formats: &[1],
}));
try!(self.stream.write_message(&frontend::Execute {
portal: portal_name,
max_rows: row_limit,
@ -592,10 +595,7 @@ impl InnerConnection {
}
fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> {
try!(self.stream.write_message(&frontend::Close {
variant: type_,
name: name,
}));
try!(self.stream.write_message2(|buf| frontend::close(type_, name, buf)));
try!(self.stream.write_message(&frontend::Sync));
try!(self.stream.flush());
let resp = match try!(self.read_message()) {
@ -662,7 +662,7 @@ impl InnerConnection {
let row = rows.pop_front().unwrap();
let (name, type_, elem_oid, rngsubtype, basetype, schema, relid) = {
let ctx = SessionInfo::new(self);
let ctx = SessionInfo::new(&self.parameters);
let name = try!(String::from_sql(&Type::Name, &mut &**row[0].as_ref().unwrap(), &ctx)
.map_err(Error::Conversion));
let type_ = try!(i8::from_sql(&Type::Char, &mut &**row[1].as_ref().unwrap(), &ctx)
@ -740,7 +740,7 @@ impl InnerConnection {
let mut rows = VecDeque::new();
try!(self.read_rows(&mut rows));
let ctx = SessionInfo::new(self);
let ctx = SessionInfo::new(&self.parameters);
let mut variants = vec![];
for row in rows {
variants.push(try!(String::from_sql(&Type::Name, &mut &**row[0].as_ref().unwrap(), &ctx)
@ -776,7 +776,7 @@ impl InnerConnection {
let mut fields = vec![];
for row in rows {
let (name, type_) = {
let ctx = SessionInfo::new(self);
let ctx = SessionInfo::new(&self.parameters);
let name =
try!(String::from_sql(&Type::Name, &mut &**row[0].as_ref().unwrap(), &ctx)
.map_err(Error::Conversion));
@ -1316,7 +1316,7 @@ trait LazyRowsNew<'trans, 'stmt> {
}
trait SessionInfoNew<'a> {
fn new(conn: &'a InnerConnection) -> SessionInfo<'a>;
fn new(params: &'a HashMap<String, String>) -> SessionInfo<'a>;
}
trait StatementInternals<'conn> {

View File

@ -39,6 +39,15 @@ impl MessageStream {
self.stream.get_ref()
}
pub fn write_message2<F, E>(&mut self, f: F) -> Result<(), E>
where F: FnOnce(&mut Vec<u8>) -> Result<(), E>,
E: From<io::Error>
{
self.buf.clear();
try!(f(&mut self.buf));
self.stream.write_all(&self.buf).map_err(From::from)
}
pub fn write_message(&mut self, message: &frontend::Message) -> io::Result<()> {
self.buf.clear();
try!(frontend::Message::write(message, &mut self.buf));

View File

@ -237,8 +237,8 @@ impl<'a> Row<'a> {
}
let conn = self.stmt.conn().conn.borrow();
let value = match self.data[idx] {
Some(ref data) => FromSql::from_sql(ty, &mut &**data, &SessionInfo::new(&*conn)),
None => FromSql::from_sql_null(ty, &SessionInfo::new(&*conn)),
Some(ref data) => FromSql::from_sql(ty, &mut &**data, &SessionInfo::new(&conn.parameters)),
None => FromSql::from_sql_null(ty, &SessionInfo::new(&conn.parameters)),
};
Some(value.map_err(Error::Conversion))
}

View File

@ -528,7 +528,7 @@ impl<'a> CopyInfo<'a> {
/// Returns session info for the associated connection.
pub fn session_info<'b>(&'b self) -> SessionInfo<'b> {
SessionInfo::new(&*self.conn)
SessionInfo::new(&self.conn.parameters)
}
}

View File

@ -1,6 +1,7 @@
//! Traits dealing with Postgres data types
use fallible_iterator::FallibleIterator;
use postgres_protocol;
use postgres_protocol::types::{self, ArrayDimension};
use std::collections::HashMap;
use std::error::Error;
@ -12,7 +13,7 @@ pub use postgres_protocol::Oid;
pub use self::type_gen::Type;
pub use self::special::{Date, Timestamp};
use {SessionInfoNew, InnerConnection, OtherNew, WrongTypeNew, FieldNew};
use {SessionInfoNew, OtherNew, WrongTypeNew, FieldNew};
/// Generates a simple implementation of `ToSql::accepts` which accepts the
/// types passed to it.
@ -82,13 +83,14 @@ mod special;
mod type_gen;
/// A structure providing information for conversion methods.
#[derive(Debug)]
pub struct SessionInfo<'a> {
conn: &'a InnerConnection,
parameters: &'a HashMap<String, String>,
}
impl<'a> SessionInfoNew<'a> for SessionInfo<'a> {
fn new(conn: &'a InnerConnection) -> SessionInfo<'a> {
SessionInfo { conn: conn }
fn new(parameters: &'a HashMap<String, String>) -> SessionInfo<'a> {
SessionInfo { parameters: parameters }
}
}
@ -96,15 +98,7 @@ impl<'a> SessionInfo<'a> {
/// Returns the value of the specified Postgres backend parameter, such
/// as `timezone` or `server_version`.
pub fn parameter(&self, param: &str) -> Option<&'a str> {
self.conn.parameters.get(param).map(|s| &**s)
}
}
impl<'a> fmt::Debug for SessionInfo<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("SessionInfo")
.field("parameters", &self.conn.parameters)
.finish()
self.parameters.get(param).map(|s| &**s)
}
}
@ -607,8 +601,8 @@ impl<'a, T: ToSql> ToSql for &'a [T] {
self.iter(),
|e, w| {
match try!(e.to_sql(member_type, w, ctx)) {
IsNull::No => Ok(types::IsNull::No),
IsNull::Yes => Ok(types::IsNull::Yes),
IsNull::No => Ok(postgres_protocol::IsNull::No),
IsNull::Yes => Ok(postgres_protocol::IsNull::Yes),
}
},
w));