Update futures-state-stream

This commit is contained in:
Steven Fackler 2017-09-30 14:56:15 -07:00
parent acb424afa4
commit 9373d2fa55
3 changed files with 125 additions and 86 deletions

View File

@ -42,7 +42,7 @@ with-openssl = ["tokio-openssl", "openssl"]
bytes = "0.4"
fallible-iterator = "0.1.3"
futures = "0.1.7"
futures-state-stream = "0.1"
futures-state-stream = "0.2"
postgres-protocol = { version = "0.3.0", path = "../postgres-protocol" }
postgres-shared = { version = "0.4.0", path = "../postgres-shared" }
tokio-core = "0.1"

View File

@ -73,7 +73,7 @@ extern crate tokio_uds;
use fallible_iterator::FallibleIterator;
use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend, Async};
use futures::future::Either;
use futures_state_stream::{StreamEvent, StateStream, BoxStateStream, FutureExt};
use futures_state_stream::{StreamEvent, StateStream, FutureExt};
use postgres_protocol::authentication;
use postgres_protocol::message::{backend, frontend};
use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields};
@ -168,7 +168,7 @@ where
c.send(buf).map_err(error::io)
})
.map(|_| ())
.boxed()
.boxed2()
}
struct InnerConnection {
@ -192,7 +192,7 @@ impl InnerConnection {
io::ErrorKind::Other,
"connection desynchronized due to earlier IO error",
);
return Err((e, self)).into_future().boxed();
return Err((e, self)).into_future().boxed2();
}
self.into_future()
@ -225,7 +225,7 @@ impl InnerConnection {
s.desynchronized = true;
(e, s)
})
.boxed()
.boxed2()
}
}
@ -329,7 +329,7 @@ impl Connection {
}).and_then(|(s, params)| s.startup(params))
.and_then(|(s, params)| s.handle_auth(params))
.and_then(|s| s.finish_startup())
.boxed()
.boxed2()
}
fn startup(self, params: ConnectParams) -> BoxFuture<(Connection, ConnectParams), Error> {
@ -349,7 +349,7 @@ impl Connection {
.and_then(move |()| self.0.send(buf))
.map_err(error::io)
.map(move |s| (Connection(s), params))
.boxed()
.boxed2()
}
fn handle_auth(self, params: ConnectParams) -> BoxFuture<Connection, Error> {
@ -406,7 +406,7 @@ impl Connection {
Some(m) => Either::A(s.handle_auth_response(m)),
None => Either::B(Ok(s).into_future()),
})
.boxed()
.boxed2()
}
fn handle_auth_response(self, message: Vec<u8>) -> BoxFuture<Connection, Error> {
@ -419,7 +419,7 @@ impl Connection {
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
_ => Err(bad_message()),
})
.boxed()
.boxed2()
}
fn finish_startup(self) -> BoxFuture<Connection, Error> {
@ -438,7 +438,7 @@ impl Connection {
}
_ => Either::B(Err(bad_message()).into_future()),
})
.boxed()
.boxed2()
}
fn simple_query(
@ -447,14 +447,14 @@ impl Connection {
) -> BoxFuture<(Vec<RowData>, Connection), (Error, Connection)> {
let mut buf = vec![];
if let Err(e) = frontend::query(query, &mut buf) {
return Err((error::io(e), self)).into_future().boxed();
return Err((error::io(e), self)).into_future().boxed2();
}
self.0
.send2(buf)
.map_err(|(e, s)| (error::io(e), Connection(s)))
.and_then(|s| Connection(s).simple_read_rows(vec![]))
.boxed()
.boxed2()
}
// This has its own read_rows since it will need to handle multiple query completions
@ -467,7 +467,7 @@ impl Connection {
.map_err(|(e, s)| (error::io(e), Connection(s)))
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => {
Ok((rows, Connection(s))).into_future().boxed()
Ok((rows, Connection(s))).into_future().boxed2()
}
backend::Message::DataRow(body) => {
match RowData::new(body) {
@ -475,16 +475,16 @@ impl Connection {
rows.push(row);
Connection(s).simple_read_rows(rows)
}
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed(),
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
}
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) |
backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
_ => Err((bad_message(), Connection(s))).into_future().boxed(),
_ => Err((bad_message(), Connection(s))).into_future().boxed2(),
})
.boxed()
.boxed2()
}
fn ready<T>(self, t: T) -> BoxFuture<(T, Connection), (Error, Connection)>
@ -499,7 +499,7 @@ impl Connection {
_ => Err((bad_message(), Connection(s))),
})
.and_then(|s| Connection(s).close_gc().map(|s| (t, s)))
.boxed()
.boxed2()
}
fn close_gc(self) -> BoxFuture<Connection, (Error, Connection)> {
@ -510,7 +510,7 @@ impl Connection {
messages.push(buf);
}
if messages.is_empty() {
return Ok(self).into_future().boxed();
return Ok(self).into_future().boxed2();
}
let mut buf = vec![];
@ -522,7 +522,7 @@ impl Connection {
))
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed()
.boxed2()
}
fn finish_close_gc(self) -> BoxFuture<Connection, (Error, Connection)> {
@ -535,7 +535,7 @@ impl Connection {
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
_ => Either::A(Err((bad_message(), Connection(s))).into_future()),
})
.boxed()
.boxed2()
}
fn ready_err<T>(self, body: ErrorResponseBody) -> BoxFuture<T, (Error, Connection)>
@ -544,11 +544,11 @@ impl Connection {
{
let e = match DbError::new(&mut body.fields()) {
Ok(e) => e,
Err(e) => return Err((error::io(e), self)).into_future().boxed(),
Err(e) => return Err((error::io(e), self)).into_future().boxed2(),
};
self.ready(e)
.and_then(|(e, s)| Err((error::db(e), s)))
.boxed()
.boxed2()
}
/// Execute a sequence of SQL statements.
@ -565,7 +565,7 @@ impl Connection {
/// data in the statement. Do not form statements via string concatenation
/// and feed them into this method.
pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, (Error, Connection)> {
self.simple_query(query).map(|r| r.1).boxed()
self.simple_query(query).map(|r| r.1).boxed2()
}
fn raw_prepare(
@ -578,7 +578,7 @@ impl Connection {
let mut sync = vec![];
frontend::sync(&mut sync);
if let Err(e) = frontend::parse(name, query, None, &mut parse).and_then(|()| frontend::describe(b'S', name, &mut describe)) {
return Err((error::io(e), self)).into_future().boxed();
return Err((error::io(e), self)).into_future().boxed2();
}
let it = Some(parse)
@ -590,7 +590,7 @@ impl Connection {
.map_err(|(e, s, _)| (e, s))
.and_then(|s| s.0.read())
.map_err(|(e, s)| (error::io(e), Connection(s)))
.boxed() // work around nonlinear trans blowup
.boxed2() // work around nonlinear trans blowup
.and_then(|(m, s)| {
match m {
backend::Message::ParseComplete => Either::A(Ok(s).into_future()),
@ -613,7 +613,7 @@ impl Connection {
}
})
.and_then(|(p, s)| s.read().map(|(m, s)| (p, m, s)).map_err(|(e, s)| (error::io(e), Connection(s))))
.boxed() // work around nonlinear trans blowup
.boxed2() // work around nonlinear trans blowup
.and_then(|(p, m, s)| {
match m {
backend::Message::RowDescription(body) => {
@ -640,7 +640,7 @@ impl Connection {
|f, t| Column::new(f.0, t))
.map(|(r, s)| (p, r, s))
})
.boxed()
.boxed2()
}
fn get_types<T, U, I, F, G>(
@ -665,20 +665,20 @@ impl Connection {
out.push(build(v, ty));
s.get_types(raw, out, get_oid, build)
})
.boxed()
.boxed2()
}
None => Ok((out, self)).into_future().boxed(),
None => Ok((out, self)).into_future().boxed2(),
}
}
fn get_type(self, oid: Oid) -> BoxFuture<(Type, Connection), (Error, Connection)> {
if let Some(type_) = Type::from_oid(oid) {
return Ok((type_, self)).into_future().boxed();
return Ok((type_, self)).into_future().boxed2();
};
let ty = self.0.types.get(&oid).map(Clone::clone);
if let Some(ty) = ty {
return Ok((ty, self)).into_future().boxed();
return Ok((ty, self)).into_future().boxed2();
}
self.get_unknown_type(oid)
@ -686,7 +686,7 @@ impl Connection {
c.0.types.insert(oid, ty.clone());
(ty, c)
})
.boxed()
.boxed2()
}
fn get_unknown_type(self, oid: Oid) -> BoxFuture<(Type, Connection), (Error, Connection)> {
@ -731,31 +731,31 @@ impl Connection {
Either::B(
c.get_enum_variants(oid)
.map(|(v, c)| (Kind::Enum(v), c))
.boxed(),
.boxed2(),
)
} else if basetype != 0 {
Either::B(
c.get_type(basetype)
.map(|(t, c)| (Kind::Domain(t), c))
.boxed(),
.boxed2(),
)
} else if elem_oid != 0 {
Either::B(
c.get_type(elem_oid)
.map(|(t, c)| (Kind::Array(t), c))
.boxed(),
.boxed2(),
)
} else if relid != 0 {
Either::B(
c.get_composite_fields(relid)
.map(|(f, c)| (Kind::Composite(f), c))
.boxed(),
.boxed2(),
)
} else if let Some(rngsubtype) = rngsubtype {
Either::B(
c.get_type(rngsubtype)
.map(|(t, c)| (Kind::Range(t), c))
.boxed(),
.boxed2(),
)
} else {
Either::A(Ok((Kind::Simple, c)).into_future())
@ -765,12 +765,12 @@ impl Connection {
move |(k, c)| (Type::_new(name, oid, k, schema), c),
))
})
.boxed()
.boxed2()
}
fn setup_typeinfo_query(self) -> BoxFuture<Connection, (Error, Connection)> {
if self.0.has_typeinfo_query {
return Ok(self).into_future().boxed();
return Ok(self).into_future().boxed2();
}
self.raw_prepare(
@ -804,7 +804,7 @@ impl Connection {
c.0.has_typeinfo_query = true;
c
})
.boxed()
.boxed2()
}
fn get_enum_variants(
@ -827,12 +827,12 @@ impl Connection {
}
Ok((variants, c))
})
.boxed()
.boxed2()
}
fn setup_typeinfo_enum_query(self) -> BoxFuture<Connection, (Error, Connection)> {
if self.0.has_typeinfo_enum_query {
return Ok(self).into_future().boxed();
return Ok(self).into_future().boxed2();
}
self.raw_prepare(
@ -854,10 +854,13 @@ impl Connection {
c.0.has_typeinfo_enum_query = true;
c
})
.boxed()
.boxed2()
}
fn get_composite_fields(self, oid: Oid) -> BoxFuture<(Vec<Field>, Connection), (Error, Connection)> {
fn get_composite_fields(
self,
oid: Oid,
) -> BoxFuture<(Vec<Field>, Connection), (Error, Connection)> {
self.setup_typeinfo_composite_query()
.and_then(move |c| {
c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[OID], &[&oid])
@ -869,11 +872,15 @@ impl Connection {
|(mut fields, c), row| {
let name = match String::from_sql_nullable(&NAME, row.get(0)) {
Ok(name) => name,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
Err(e) => {
return Either::A(Err((error::conversion(e), c)).into_future())
}
};
let oid = match Oid::from_sql_nullable(&OID, row.get(1)) {
Ok(oid) => oid,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
Err(e) => {
return Either::A(Err((error::conversion(e), c)).into_future())
}
};
Either::B(c.get_type(oid).map(move |(ty, c)| {
fields.push(Field::new(name, ty));
@ -882,12 +889,12 @@ impl Connection {
},
)
})
.boxed()
.boxed2()
}
fn setup_typeinfo_composite_query(self) -> BoxFuture<Connection, (Error, Connection)> {
if self.0.has_typeinfo_composite_query {
return Ok(self).into_future().boxed();
return Ok(self).into_future().boxed2();
}
self.raw_prepare(
@ -902,7 +909,7 @@ impl Connection {
c.0.has_typeinfo_composite_query = true;
c
})
.boxed()
.boxed2()
}
fn raw_execute(
@ -942,11 +949,9 @@ impl Connection {
Err(frontend::BindError::Serialization(e)) => Err((error::io(e), self)),
};
r.and_then(|s| {
match frontend::execute(portal, 0, &mut execute) {
Ok(()) => Ok(s),
Err(e) => Err((error::io(e), s)),
}
r.and_then(|s| match frontend::execute(portal, 0, &mut execute) {
Ok(()) => Ok(s),
Err(e) => Err((error::io(e), s)),
}).into_future()
.and_then(|s| {
let it = Some(bind)
@ -954,15 +959,21 @@ impl Connection {
.chain(Some(execute))
.chain(Some(sync))
.map(Ok::<_, io::Error>);
s.0.send_all2(futures::stream::iter(it)).map_err(|(e, s, _)| (error::io(e), Connection(s)))
s.0.send_all2(futures::stream::iter(it)).map_err(
|(e, s, _)| {
(error::io(e), Connection(s))
},
)
})
.and_then(|s| {
s.0.read().map_err(|(e, s)| (error::io(e), Connection(s)))
})
.and_then(|s| s.0.read().map_err(|(e, s)| (error::io(e), Connection(s))))
.and_then(|(m, s)| match m {
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
_ => Either::A(Err((bad_message(), Connection(s))).into_future()),
})
.boxed()
.boxed2()
}
fn finish_execute(self) -> BoxFuture<(u64, Connection), (Error, Connection)> {
@ -970,26 +981,27 @@ impl Connection {
.read()
.map_err(|(e, s)| (error::io(e), Connection(s)))
.and_then(|(m, s)| match m {
backend::Message::DataRow(_) => Connection(s).finish_execute().boxed(),
backend::Message::DataRow(_) => Connection(s).finish_execute().boxed2(),
backend::Message::CommandComplete(body) => {
let r = body.tag()
.map(|tag| {
tag.split_whitespace().last().unwrap().parse().unwrap_or(0)
});
let r = body.tag().map(|tag| {
tag.split_whitespace().last().unwrap().parse().unwrap_or(0)
});
match r {
Ok(n) => Connection(s).ready(n).boxed(),
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed(),
Ok(n) => Connection(s).ready(n).boxed2(),
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
}
}
backend::Message::EmptyQueryResponse => Connection(s).ready(0).boxed(),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body).boxed(),
_ => Err((bad_message(), Connection(s))).into_future().boxed(),
backend::Message::EmptyQueryResponse => Connection(s).ready(0).boxed2(),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body).boxed2(),
_ => Err((bad_message(), Connection(s))).into_future().boxed2(),
})
.boxed()
.boxed2()
}
fn read_rows(self) -> BoxStateStream<RowData, Connection, (Error, Connection)> {
fn read_rows(
self,
) -> Box<StateStream<Item = RowData, State = Connection, Error = Error> + Send> {
futures_state_stream::unfold(self, |c| {
c.read_row().and_then(|(r, c)| match r {
Some(data) => {
@ -998,7 +1010,7 @@ impl Connection {
}
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
})
}).boxed()
}).boxed2()
}
fn read_row(self) -> BoxFuture<(Option<RowData>, Connection), (Error, Connection)> {
@ -1011,7 +1023,7 @@ impl Connection {
backend::Message::DataRow(body) => {
let r = match RowData::new(body) {
Ok(r) => Ok((Some(r), c)),
Err(e) => Err((error::io(e), c))
Err(e) => Err((error::io(e), c)),
};
Either::A(r.into_future())
}
@ -1021,7 +1033,7 @@ impl Connection {
_ => Either::A(Err((bad_message(), c)).into_future()),
}
})
.boxed()
.boxed2()
}
/// Creates a new prepared statement.
@ -1034,7 +1046,7 @@ impl Connection {
Statement::new(conn.0.close_sender.clone(), name, params, Arc::new(columns));
(stmt, conn)
})
.boxed()
.boxed2()
}
/// Executes a statement, returning the number of rows modified.
@ -1050,7 +1062,7 @@ impl Connection {
) -> BoxFuture<(u64, Connection), (Error, Connection)> {
self.raw_execute(statement.name(), "", statement.parameters(), params)
.and_then(|conn| conn.finish_execute())
.boxed()
.boxed2()
}
/// Executes a statement, returning a stream over the resulting rows.
@ -1063,19 +1075,19 @@ impl Connection {
self,
statement: &Statement,
params: &[&ToSql],
) -> BoxStateStream<Row, Connection, (Error, Connection)> {
) -> Box<StateStream<Item = Row, State = Connection, Error = Error> + Send> {
let columns = statement.columns_arc().clone();
self.raw_execute(statement.name(), "", statement.parameters(), params)
.map(|c| c.read_rows().map(move |r| Row::new(columns.clone(), r)))
.flatten_state_stream()
.boxed()
.boxed2()
}
/// Starts a new transaction.
pub fn transaction(self) -> BoxFuture<Transaction, (Error, Connection)> {
self.simple_query("BEGIN")
.map(|(_, c)| Transaction::new(c))
.boxed()
.boxed2()
}
/// Returns a stream of asynchronus notifications receieved from the server.
@ -1146,3 +1158,31 @@ where
{
io::Error::new(io::ErrorKind::InvalidInput, "unexpected message").into()
}
trait BoxedFuture<I, E> {
fn boxed2(self) -> Box<Future<Item = I, Error = E> + Send>;
}
impl<F, I, E> BoxedFuture<I, E> for F
where
F: Future<Item = I, Error = E> + Send + 'static,
{
fn boxed2(self) -> Box<Future<Item = I, Error = E> + Send> {
Box::new(self)
}
}
trait BoxedStateStream<I, S, E> {
fn boxed2(self) -> Box<StateStream<Item = I, State = S, Error = E> + Send>;
}
impl<T, I, S, E> BoxedStateStream<I, S, E> for T
where
T: StateStream<Item = I, State = S, Error = E>
+ Send
+ 'static,
{
fn boxed2(self) -> Box<StateStream<Item = I, State = S, Error = E> + Send> {
Box::new(self)
}
}

View File

@ -1,9 +1,9 @@
//! Transactions.
use futures::{Future, BoxFuture};
use futures_state_stream::{StateStream, BoxStateStream};
use futures_state_stream::StateStream;
use Connection;
use {Connection, BoxedFuture, BoxedStateStream};
use error::Error;
use stmt::Statement;
use types::ToSql;
@ -24,7 +24,7 @@ impl Transaction {
.batch_execute(query)
.map(Transaction)
.map_err(transaction_err)
.boxed()
.boxed2()
}
/// Like `Connection::prepare`.
@ -33,7 +33,7 @@ impl Transaction {
.prepare(query)
.map(|(s, c)| (s, Transaction(c)))
.map_err(transaction_err)
.boxed()
.boxed2()
}
/// Like `Connection::execute`.
@ -46,7 +46,7 @@ impl Transaction {
.execute(statement, params)
.map(|(n, c)| (n, Transaction(c)))
.map_err(transaction_err)
.boxed()
.boxed2()
}
/// Like `Connection::query`.
@ -54,12 +54,11 @@ impl Transaction {
self,
statement: &Statement,
params: &[&ToSql],
) -> BoxStateStream<Row, Transaction, (Error, Transaction)> {
) -> Box<StateStream<Item = Row, State = Transaction, Error = Error> + Send> {
self.0
.query(statement, params)
.map_state(Transaction)
.map_err(transaction_err)
.boxed()
.boxed2()
}
/// Commits the transaction.
@ -73,7 +72,7 @@ impl Transaction {
}
fn finish(self, query: &str) -> BoxFuture<Connection, (Error, Connection)> {
self.0.simple_query(query).map(|(_, c)| c).boxed()
self.0.simple_query(query).map(|(_, c)| c).boxed2()
}
}