This commit is contained in:
Steven Fackler 2017-02-23 20:27:29 -08:00
parent 68ba14d679
commit 6e2927bf7e
6 changed files with 208 additions and 199 deletions

View File

@ -21,7 +21,9 @@
//!
//! fn main() {
//! let mut l = Core::new().unwrap();
//! let done = Connection::connect("postgresql://postgres@localhost", TlsMode::None, &l.handle())
//! let done = Connection::connect("postgresql://postgres@localhost",
//! TlsMode::None,
//! &l.handle())
//! .then(|c| {
//! c.unwrap()
//! .batch_execute("CREATE TABLE person (
@ -144,12 +146,9 @@ pub fn cancel_query<T>(params: T,
{
let params = match params.into_connect_params() {
Ok(params) => {
Either::A(stream::connect(params.host().clone(),
params.port(),
tls_mode,
handle))
Either::A(stream::connect(params.host().clone(), params.port(), tls_mode, handle))
}
Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future())
Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()),
};
params.and_then(move |c| {
@ -266,31 +265,29 @@ impl Connection {
{
let fut = match params.into_connect_params() {
Ok(params) => {
Either::A(stream::connect(params.host().clone(),
params.port(),
tls_mode,
handle)
Either::A(stream::connect(params.host().clone(), params.port(), tls_mode, handle)
.map(|s| (s, params)))
}
Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future())
Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()),
};
fut.map(|(s, params)| {
let (sender, receiver) = mpsc::channel();
(Connection(InnerConnection {
stream: s,
close_sender: sender,
close_receiver: receiver,
parameters: HashMap::new(),
types: HashMap::new(),
cancel_data: CancelData {
process_id: 0,
secret_key: 0,
},
has_typeinfo_query: false,
has_typeinfo_enum_query: false,
has_typeinfo_composite_query: false,
}), params)
stream: s,
close_sender: sender,
close_receiver: receiver,
parameters: HashMap::new(),
types: HashMap::new(),
cancel_data: CancelData {
process_id: 0,
secret_key: 0,
},
has_typeinfo_query: false,
has_typeinfo_enum_query: false,
has_typeinfo_composite_query: false,
}),
params)
})
.and_then(|(s, params)| s.startup(params))
.and_then(|(s, params)| s.handle_auth(params))
@ -298,7 +295,9 @@ impl Connection {
.boxed()
}
fn startup(self, params: ConnectParams) -> BoxFuture<(Connection, ConnectParams), ConnectError> {
fn startup(self,
params: ConnectParams)
-> BoxFuture<(Connection, ConnectParams), ConnectError> {
let mut buf = vec![];
let result = {
let options = [("client_encoding", "UTF8"), ("timezone", "GMT")];
@ -319,7 +318,8 @@ impl Connection {
}
fn handle_auth(self, params: ConnectParams) -> BoxFuture<Connection, ConnectError> {
self.0.read()
self.0
.read()
.map_err(ConnectError::Io)
.and_then(move |(m, s)| {
let response = match m {
@ -333,8 +333,9 @@ impl Connection {
.map_err(Into::into)
}
None => {
Err(ConnectError::ConnectParams(
"password was required but not provided".into()))
Err(ConnectError::ConnectParams("password was required but not \
provided"
.into()))
}
}
}
@ -350,8 +351,9 @@ impl Connection {
.map_err(Into::into)
}
None => {
Err(ConnectError::ConnectParams(
"password was required but not provided".into()))
Err(ConnectError::ConnectParams("password was required but not \
provided"
.into()))
}
}
}
@ -361,45 +363,41 @@ impl Connection {
response.map(|m| (m, Connection(s)))
})
.and_then(|(m, s)| {
match m {
Some(m) => Either::A(s.handle_auth_response(m)),
None => Either::B(Ok(s).into_future())
}
.and_then(|(m, s)| match m {
Some(m) => Either::A(s.handle_auth_response(m)),
None => Either::B(Ok(s).into_future()),
})
.boxed()
}
fn handle_auth_response(self, message: Vec<u8>) -> BoxFuture<Connection, ConnectError> {
self.0.send(message)
self.0
.send(message)
.and_then(|s| s.read())
.map_err(ConnectError::Io)
.and_then(|(m, s)| {
match m {
backend::Message::AuthenticationOk => Ok(Connection(s)),
backend::Message::ErrorResponse(body) => Err(connect_err(&mut body.fields())),
_ => Err(bad_message()),
}
.and_then(|(m, s)| match m {
backend::Message::AuthenticationOk => Ok(Connection(s)),
backend::Message::ErrorResponse(body) => Err(connect_err(&mut body.fields())),
_ => Err(bad_message()),
})
.boxed()
}
fn finish_startup(self) -> BoxFuture<Connection, ConnectError> {
self.0.read()
self.0
.read()
.map_err(ConnectError::Io)
.and_then(|(m, mut s)| {
match m {
backend::Message::BackendKeyData(body) => {
s.cancel_data.process_id = body.process_id();
s.cancel_data.secret_key = body.secret_key();
Either::A(Connection(s).finish_startup())
}
backend::Message::ReadyForQuery(_) => Either::B(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(Err(connect_err(&mut body.fields())).into_future())
}
_ => Either::B(Err(bad_message()).into_future()),
.and_then(|(m, mut s)| match m {
backend::Message::BackendKeyData(body) => {
s.cancel_data.process_id = body.process_id();
s.cancel_data.secret_key = body.secret_key();
Either::A(Connection(s).finish_startup())
}
backend::Message::ReadyForQuery(_) => Either::B(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(Err(connect_err(&mut body.fields())).into_future())
}
_ => Either::B(Err(bad_message()).into_future()),
})
.boxed()
}
@ -415,29 +413,30 @@ impl Connection {
}
// This has its own read_rows since it will need to handle multiple query completions
fn simple_read_rows(self, mut rows: Vec<RowData>) -> BoxFuture<(Vec<RowData>, Connection), Error> {
self.0.read()
fn simple_read_rows(self,
mut rows: Vec<RowData>)
-> BoxFuture<(Vec<RowData>, Connection), Error> {
self.0
.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => {
Ok((rows, Connection(s))).into_future().boxed()
}
backend::Message::DataRow(body) => {
match body.values().collect() {
Ok(row) => {
rows.push(row);
Connection(s).simple_read_rows(rows)
}
Err(e) => Err(Error::Io(e)).into_future().boxed(),
}
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) |
backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
_ => Err(bad_message()).into_future().boxed(),
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => {
Ok((rows, Connection(s))).into_future().boxed()
}
backend::Message::DataRow(body) => {
match body.values().collect() {
Ok(row) => {
rows.push(row);
Connection(s).simple_read_rows(rows)
}
Err(e) => Err(Error::Io(e)).into_future().boxed(),
}
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) |
backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
_ => Err(bad_message()).into_future().boxed(),
})
.boxed()
}
@ -445,13 +444,12 @@ impl Connection {
fn ready<T>(self, t: T) -> BoxFuture<(T, Connection), Error>
where T: 'static + Send
{
self.0.read()
self.0
.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))),
_ => Err(bad_message())
}
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))),
_ => Err(bad_message()),
})
.and_then(|(t, s)| s.close_gc().map(|s| (t, s)))
.boxed()
@ -471,26 +469,22 @@ impl Connection {
let mut buf = vec![];
frontend::sync(&mut buf);
messages.push(buf);
self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
self.0
.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
.map_err(Error::Io)
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed()
}
fn finish_close_gc(self) -> BoxFuture<Connection, Error> {
self.0.read()
self.0
.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => {
Either::A(Ok(Connection(s)).into_future())
}
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => Either::A(Ok(Connection(s)).into_future()),
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
_ => Either::A(Err(bad_message()).into_future()),
})
.boxed()
}
@ -685,7 +679,9 @@ impl Connection {
} else if elem_oid != 0 {
Either::B(c.get_type(elem_oid).map(|(t, c)| (Kind::Array(t), c)).boxed())
} else if relid != 0 {
Either::B(c.get_composite_fields(relid).map(|(f, c)| (Kind::Composite(f), c)).boxed())
Either::B(c.get_composite_fields(relid)
.map(|(f, c)| (Kind::Composite(f), c))
.boxed())
} else if let Some(rngsubtype) = rngsubtype {
Either::B(c.get_type(rngsubtype).map(|(t, c)| (Kind::Range(t), c)).boxed())
} else {
@ -766,21 +762,17 @@ impl Connection {
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY enumsortorder")
.or_else(|e| {
match e {
Error::Db(e, c) => {
if e.code != SqlState::UndefinedColumn {
return Either::B(Err(Error::Db(e, c)).into_future());
}
Either::A(c.raw_prepare(TYPEINFO_ENUM_QUERY,
"SELECT enumlabel \
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY oid"))
.or_else(|e| match e {
Error::Db(e, c) => {
if e.code != SqlState::UndefinedColumn {
return Either::B(Err(Error::Db(e, c)).into_future());
}
e => Either::B(Err(e).into_future()),
Either::A(c.raw_prepare(TYPEINFO_ENUM_QUERY,
"SELECT enumlabel FROM pg_catalog.pg_enum WHERE \
enumtypid = $1 ORDER BY oid"))
}
e => Either::B(Err(e).into_future()),
})
.map(|(_, _, mut c)| {
c.0.has_typeinfo_enum_query = true;
@ -791,7 +783,9 @@ impl Connection {
fn get_composite_fields(self, oid: Oid) -> BoxFuture<(Vec<Field>, Connection), Error> {
self.setup_typeinfo_composite_query()
.and_then(move |c| c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::Oid], &[&oid]))
.and_then(move |c| {
c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::Oid], &[&oid])
})
.and_then(|c| c.read_rows().collect())
.and_then(|(r, c)| {
futures::stream::iter(r.into_iter().map(Ok))
@ -852,12 +846,10 @@ impl Connection {
stmt,
Some(1),
params.iter().zip(param_types),
|(param, ty), buf| {
match param.to_sql_checked(ty, buf) {
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Err(e) => Err(e),
}
|(param, ty), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Err(e) => Err(e),
},
Some(1),
&mut bind);
@ -874,52 +866,46 @@ impl Connection {
})
.into_future()
.and_then(|s| {
let it = Some(bind).into_iter()
let it = Some(bind)
.into_iter()
.chain(Some(execute))
.chain(Some(sync))
.map(Ok::<_, io::Error>);
s.0.send_all(futures::stream::iter(it)).map_err(Error::Io)
})
.and_then(|s| s.0.read().map_err(Error::Io))
.and_then(|(m, s)| {
match m {
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
.and_then(|(m, s)| match m {
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
_ => Either::A(Err(bad_message()).into_future()),
})
.boxed()
}
fn finish_execute(self) -> BoxFuture<(u64, Connection), Error> {
self.0.read()
self.0
.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::DataRow(_) => Either::B(Connection(s).finish_execute()),
backend::Message::CommandComplete(body) => {
Either::A(body.tag()
.map(|tag| {
let num = tag.split_whitespace()
.last()
.unwrap()
.parse()
.unwrap_or(0);
(num, Connection(s))
})
.map_err(Error::Io)
.into_future())
}
backend::Message::EmptyQueryResponse => {
Either::A(Ok((0, Connection(s))).into_future())
}
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
.and_then(|(m, s)| match m {
backend::Message::DataRow(_) => Either::B(Connection(s).finish_execute()),
backend::Message::CommandComplete(body) => {
Either::A(body.tag()
.map(|tag| {
let num = tag.split_whitespace()
.last()
.unwrap()
.parse()
.unwrap_or(0);
(num, Connection(s))
})
.map_err(Error::Io)
.into_future())
}
backend::Message::EmptyQueryResponse => {
Either::A(Ok((0, Connection(s))).into_future())
}
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
_ => Either::A(Err(bad_message()).into_future()),
})
.and_then(|(n, s)| s.ready(n))
.boxed()
@ -927,21 +913,21 @@ impl Connection {
fn read_rows(self) -> BoxStateStream<RowData, Connection, Error> {
futures_state_stream::unfold(self, |c| {
c.read_row()
.and_then(|(r, c)| {
match r {
c.read_row()
.and_then(|(r, c)| match r {
Some(data) => {
let event = StreamEvent::Next((data, c));
Either::A(Ok(event).into_future())
},
}
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
}
})
}).boxed()
})
})
.boxed()
}
fn read_row(self) -> BoxFuture<(Option<RowData>, Connection), Error> {
self.0.read()
self.0
.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
let c = Connection(s);
@ -955,9 +941,7 @@ impl Connection {
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) => Either::A(Ok((None, c)).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(c.ready_err(body))
}
backend::Message::ErrorResponse(body) => Either::B(c.ready_err(body)),
_ => Either::A(Err(bad_message()).into_future()),
}
})
@ -970,10 +954,8 @@ impl Connection {
let name = format!("s{}", id);
self.raw_prepare(&name, query)
.map(|(params, columns, conn)| {
let stmt = Statement::new(conn.0.close_sender.clone(),
name,
params,
Arc::new(columns));
let stmt =
Statement::new(conn.0.close_sender.clone(), name, params, Arc::new(columns));
(stmt, conn)
})
.boxed()
@ -985,7 +967,10 @@ impl Connection {
///
/// Panics if the number of parameters provided does not match the number
/// expected.
pub fn execute(self, statement: &Statement, params: &[&ToSql]) -> BoxFuture<(u64, Connection), Error> {
pub fn execute(self,
statement: &Statement,
params: &[&ToSql])
-> BoxFuture<(u64, Connection), Error> {
self.raw_execute(statement.name(), "", statement.parameters(), params)
.and_then(|conn| conn.finish_execute())
.boxed()

View File

@ -39,7 +39,10 @@ pub fn connect(host: Host,
}
#[cfg(not(unix))]
Host::Unix(_) => {
Either::B(Err(ConnectError::ConnectParams("unix sockets are not supported on this platform".into())).into_future())
Either::B(Err(ConnectError::ConnectParams("unix sockets are not supported on this \
platform"
.into()))
.into_future())
}
};
@ -52,7 +55,7 @@ pub fn connect(host: Host,
s.framed(PostgresCodec)
})
.boxed()
},
}
};
inner.map(|s| s.framed(SslCodec))
@ -67,13 +70,18 @@ pub fn connect(host: Host,
let s = s.into_inner();
match (m, required) {
(Some(b'N'), true) => {
Either::A(Err(ConnectError::Tls("the server does not support TLS".into())).into_future())
Either::A(Err(ConnectError::Tls("the server does not support TLS".into()))
.into_future())
}
(Some(b'N'), false) => {
let s: Box<TlsStream> = Box::new(s);
Either::A(Ok(s).into_future())
},
(None, _) => Either::A(Err(ConnectError::Io(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF"))).into_future()),
}
(None, _) => {
Either::A(Err(ConnectError::Io(io::Error::new(io::ErrorKind::UnexpectedEof,
"unexpected EOF")))
.into_future())
}
_ => {
let host = match host {
Host::Tcp(ref host) => host,
@ -155,7 +163,7 @@ impl Codec for PostgresCodec {
buf.drain_to(consumed);
Ok(Some(message))
}
ParseResult::Incomplete { .. } => Ok(None)
ParseResult::Incomplete { .. } => Ok(None),
}
}

View File

@ -98,18 +98,19 @@ fn batch_execute_ok() {
fn batch_execute_err() {
let mut l = Core::new().unwrap();
let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle())
.then(|r| r.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); \
INSERT INTO foo DEFAULT VALUES;"))
.and_then(|c| c.batch_execute("SELECT * FROM bogo"))
.then(|r| {
match r {
Err(Error::Db(e, s)) => {
assert!(e.code == SqlState::UndefinedTable);
s.batch_execute("SELECT * FROM foo")
}
Err(e) => panic!("unexpected error: {}", e),
Ok(_) => panic!("unexpected success"),
}
r.unwrap()
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); INSERT INTO foo DEFAULT \
VALUES;")
})
.and_then(|c| c.batch_execute("SELECT * FROM bogo"))
.then(|r| match r {
Err(Error::Db(e, s)) => {
assert!(e.code == SqlState::UndefinedTable);
s.batch_execute("SELECT * FROM foo")
}
Err(e) => panic!("unexpected error: {}", e),
Ok(_) => panic!("unexpected success"),
});
l.run(done).unwrap();
}
@ -157,7 +158,9 @@ fn query() {
fn transaction() {
let mut l = Core::new().unwrap();
let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle())
.then(|c| c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);"))
.then(|c| {
c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);")
})
.then(|c| c.unwrap().transaction())
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('joe');"))
.then(|t| t.unwrap().rollback())
@ -196,7 +199,9 @@ fn ssl_user_ssl_required() {
let mut l = Core::new().unwrap();
let handle = l.handle();
let done = Connection::connect("postgres://ssl_user@localhost/postgres", TlsMode::None, &handle);
let done = Connection::connect("postgres://ssl_user@localhost/postgres",
TlsMode::None,
&handle);
match l.run(done) {
Err(ConnectError::Db(e)) => assert!(e.code == SqlState::InvalidAuthorizationSpecification),
@ -231,7 +236,10 @@ fn domain() {
struct SessionId(Vec<u8>);
impl ToSql for SessionId {
fn to_sql(&self, ty: &Type, out: &mut Vec<u8>) -> Result<IsNull, Box<StdError + Sync + Send>> {
fn to_sql(&self,
ty: &Type,
out: &mut Vec<u8>)
-> Result<IsNull, Box<StdError + Sync + Send>> {
let inner = match *ty.kind() {
Kind::Domain(ref inner) => inner,
_ => unreachable!(),
@ -242,7 +250,7 @@ fn domain() {
fn accepts(ty: &Type) -> bool {
match *ty.kind() {
Kind::Domain(Type::Bytea) => ty.name() == "session_id",
_ => false
_ => false,
}
}
@ -264,9 +272,10 @@ fn domain() {
let handle = l.handle();
let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &handle)
.then(|c| {
c.unwrap().batch_execute(
"CREATE DOMAIN pg_temp.session_id AS bytea CHECK(octet_length(VALUE) = 16);
CREATE TABLE pg_temp.foo (id pg_temp.session_id);")
c.unwrap().batch_execute("CREATE DOMAIN pg_temp.session_id AS bytea \
CHECK(octet_length(VALUE) = 16);
CREATE \
TABLE pg_temp.foo (id pg_temp.session_id);")
})
.and_then(|c| c.prepare("INSERT INTO pg_temp.foo (id) VALUES ($1)"))
.and_then(|(s, c)| {
@ -330,7 +339,8 @@ fn enum_() {
assert_eq!(type_.name(), "mood");
match *type_.kind() {
Kind::Enum(ref variants) => {
assert_eq!(variants, &["sad".to_owned(), "ok".to_owned(), "happy".to_owned()]);
assert_eq!(variants,
&["sad".to_owned(), "ok".to_owned(), "happy".to_owned()]);
}
_ => panic!("bad type"),
}

View File

@ -40,7 +40,8 @@ impl Handshake for OpenSsl {
host: &str,
stream: Stream)
-> BoxFuture<Box<TlsStream>, Box<Error + Sync + Send>> {
self.0.connect_async(host, stream)
self.0
.connect_async(host, stream)
.map(|s| {
let s: Box<TlsStream> = Box::new(s);
s

View File

@ -22,7 +22,8 @@ impl TransactionNew for Transaction {
impl Transaction {
/// Like `Connection::batch_execute`.
pub fn batch_execute(self, query: &str) -> BoxFuture<Transaction, Error<Transaction>> {
self.0.batch_execute(query)
self.0
.batch_execute(query)
.map(Transaction)
.map_err(transaction_err)
.boxed()
@ -30,7 +31,8 @@ impl Transaction {
/// Like `Connection::prepare`.
pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), Error<Transaction>> {
self.0.prepare(query)
self.0
.prepare(query)
.map(|(s, c)| (s, Transaction(c)))
.map_err(transaction_err)
.boxed()
@ -41,7 +43,8 @@ impl Transaction {
statement: &Statement,
params: &[&ToSql])
-> BoxFuture<(u64, Transaction), Error<Transaction>> {
self.0.execute(statement, params)
self.0
.execute(statement, params)
.map(|(n, c)| (n, Transaction(c)))
.map_err(transaction_err)
.boxed()
@ -52,7 +55,8 @@ impl Transaction {
statement: &Statement,
params: &[&ToSql])
-> BoxStateStream<Row, Transaction, Error<Transaction>> {
self.0.query(statement, params)
self.0
.query(statement, params)
.map_state(Transaction)
.map_err(transaction_err)
.boxed()
@ -69,7 +73,8 @@ impl Transaction {
}
fn finish(self, query: &str) -> BoxFuture<Connection, Error> {
self.0.simple_query(query)
self.0
.simple_query(query)
.map(|(_, c)| c)
.boxed()
}
@ -79,6 +84,6 @@ fn transaction_err(e: Error) -> Error<Transaction> {
match e {
Error::Io(e) => Error::Io(e),
Error::Db(e, c) => Error::Db(e, Transaction(c)),
Error::Conversion(e, c) => Error::Conversion(e, Transaction(c))
Error::Conversion(e, c) => Error::Conversion(e, Transaction(c)),
}
}

View File

@ -1,8 +1,8 @@
//! Postgres types
#[doc(inline)]
pub use postgres_shared::types::{Oid, Type, Date, Timestamp, Kind, Field, Other,
WasNull, WrongType, FromSql, IsNull, ToSql};
pub use postgres_shared::types::{Oid, Type, Date, Timestamp, Kind, Field, Other, WasNull, WrongType,
FromSql, IsNull, ToSql};
#[doc(hidden)]
pub use postgres_shared::types::__to_sql_checked;