Stop using deprecated stuff

This commit is contained in:
Steven Fackler 2017-09-30 15:14:02 -07:00
parent 9373d2fa55
commit 6780d41225
4 changed files with 130 additions and 86 deletions

View File

@ -71,7 +71,7 @@ extern crate futures;
extern crate tokio_uds;
use fallible_iterator::FallibleIterator;
use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend, Async};
use futures::{Future, IntoFuture, Stream, Sink, Poll, StartSend, Async};
use futures::future::Either;
use futures_state_stream::{StreamEvent, StateStream, FutureExt};
use postgres_protocol::authentication;
@ -145,7 +145,7 @@ pub fn cancel_query<T>(
tls_mode: TlsMode,
cancel_data: CancelData,
handle: &Handle,
) -> BoxFuture<(), Error>
) -> Box<Future<Item = (), Error = Error> + Send>
where
T: IntoConnectParams,
{
@ -186,7 +186,15 @@ struct InnerConnection {
}
impl InnerConnection {
fn read(self) -> BoxFuture<(backend::Message, InnerConnection), (io::Error, InnerConnection)> {
fn read(
self,
) -> Box<
Future<
Item = (backend::Message, InnerConnection),
Error = (io::Error, InnerConnection),
>
+ Send,
> {
if self.desynchronized {
let e = io::Error::new(
io::ErrorKind::Other,
@ -291,7 +299,11 @@ impl Connection {
/// path contains non-UTF 8 characters, a `ConnectParams` struct should be
/// created manually and passed in. Note that Postgres does not support TLS
/// over Unix sockets.
pub fn connect<T>(params: T, tls_mode: TlsMode, handle: &Handle) -> BoxFuture<Connection, Error>
pub fn connect<T>(
params: T,
tls_mode: TlsMode,
handle: &Handle,
) -> Box<Future<Item = Connection, Error = Error> + Send>
where
T: IntoConnectParams,
{
@ -332,7 +344,10 @@ impl Connection {
.boxed2()
}
fn startup(self, params: ConnectParams) -> BoxFuture<(Connection, ConnectParams), Error> {
fn startup(
self,
params: ConnectParams,
) -> Box<Future<Item = (Connection, ConnectParams), Error = Error> + Send> {
let mut buf = vec![];
let result = {
let options = [("client_encoding", "UTF8"), ("timezone", "GMT")];
@ -352,7 +367,10 @@ impl Connection {
.boxed2()
}
fn handle_auth(self, params: ConnectParams) -> BoxFuture<Connection, Error> {
fn handle_auth(
self,
params: ConnectParams,
) -> Box<Future<Item = Connection, Error = Error> + Send> {
self.0
.read()
.map_err(|(e, _)| error::io(e))
@ -409,7 +427,10 @@ impl Connection {
.boxed2()
}
fn handle_auth_response(self, message: Vec<u8>) -> BoxFuture<Connection, Error> {
fn handle_auth_response(
self,
message: Vec<u8>,
) -> Box<Future<Item = Connection, Error = Error> + Send> {
self.0
.send(message)
.and_then(|s| s.read().map_err(|(e, _)| e))
@ -422,7 +443,7 @@ impl Connection {
.boxed2()
}
fn finish_startup(self) -> BoxFuture<Connection, Error> {
fn finish_startup(self) -> Box<Future<Item = Connection, Error = Error> + Send> {
self.0
.read()
.map_err(|(e, _)| error::io(e))
@ -444,7 +465,7 @@ impl Connection {
fn simple_query(
self,
query: &str,
) -> BoxFuture<(Vec<RowData>, Connection), (Error, Connection)> {
) -> Box<Future<Item = (Vec<RowData>, Connection), Error = (Error, Connection)> + Send> {
let mut buf = vec![];
if let Err(e) = frontend::query(query, &mut buf) {
return Err((error::io(e), self)).into_future().boxed2();
@ -461,7 +482,7 @@ impl Connection {
fn simple_read_rows(
self,
mut rows: Vec<RowData>,
) -> BoxFuture<(Vec<RowData>, Connection), (Error, Connection)> {
) -> Box<Future<Item = (Vec<RowData>, Connection), Error = (Error, Connection)> + Send> {
self.0
.read()
.map_err(|(e, s)| (error::io(e), Connection(s)))
@ -487,7 +508,10 @@ impl Connection {
.boxed2()
}
fn ready<T>(self, t: T) -> BoxFuture<(T, Connection), (Error, Connection)>
fn ready<T>(
self,
t: T,
) -> Box<Future<Item = (T, Connection), Error = (Error, Connection)> + Send>
where
T: 'static + Send,
{
@ -502,7 +526,7 @@ impl Connection {
.boxed2()
}
fn close_gc(self) -> BoxFuture<Connection, (Error, Connection)> {
fn close_gc(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
let mut messages = vec![];
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
let mut buf = vec![];
@ -517,15 +541,13 @@ impl Connection {
frontend::sync(&mut buf);
messages.push(buf);
self.0
.send_all2(futures::stream::iter(
messages.into_iter().map(Ok::<_, io::Error>),
))
.send_all2(futures::stream::iter_ok::<_, Error>(messages))
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed2()
}
fn finish_close_gc(self) -> BoxFuture<Connection, (Error, Connection)> {
fn finish_close_gc(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
self.0
.read()
.map_err(|(e, s)| (error::io(e), Connection(s)))
@ -538,7 +560,10 @@ impl Connection {
.boxed2()
}
fn ready_err<T>(self, body: ErrorResponseBody) -> BoxFuture<T, (Error, Connection)>
fn ready_err<T>(
self,
body: ErrorResponseBody,
) -> Box<Future<Item = T, Error = (Error, Connection)> + Send>
where
T: 'static + Send,
{
@ -564,7 +589,10 @@ impl Connection {
/// user-specified data, as it provides functionality to safely embed that
/// data in the statement. Do not form statements via string concatenation
/// and feed them into this method.
pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, (Error, Connection)> {
pub fn batch_execute(
self,
query: &str,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
self.simple_query(query).map(|r| r.1).boxed2()
}
@ -572,7 +600,7 @@ impl Connection {
self,
name: &str,
query: &str,
) -> BoxFuture<(Vec<Type>, Vec<Column>, Connection), (Error, Connection)> {
) -> Box<Future<Item = (Vec<Type>, Vec<Column>, Connection), Error = (Error, Connection)> + Send> {
let mut parse = vec![];
let mut describe = vec![];
let mut sync = vec![];
@ -581,12 +609,10 @@ impl Connection {
return Err((error::io(e), self)).into_future().boxed2();
}
let it = Some(parse)
.into_iter()
.chain(Some(describe))
.chain(Some(sync))
.map(Ok::<_, io::Error>);
self.0.send_all2(futures::stream::iter(it))
let it = Some(parse).into_iter().chain(Some(describe)).chain(
Some(sync),
);
self.0.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
.map_err(|(e, s, _)| (e, s))
.and_then(|s| s.0.read())
.map_err(|(e, s)| (error::io(e), Connection(s)))
@ -649,7 +675,7 @@ impl Connection {
mut out: Vec<U>,
mut get_oid: F,
mut build: G,
) -> BoxFuture<(Vec<U>, Connection), (Error, Connection)>
) -> Box<Future<Item = (Vec<U>, Connection), Error = (Error, Connection)> + Send>
where
T: 'static + Send,
U: 'static + Send,
@ -671,7 +697,10 @@ impl Connection {
}
}
fn get_type(self, oid: Oid) -> BoxFuture<(Type, Connection), (Error, Connection)> {
fn get_type(
self,
oid: Oid,
) -> Box<Future<Item = (Type, Connection), Error = (Error, Connection)> + Send> {
if let Some(type_) = Type::from_oid(oid) {
return Ok((type_, self)).into_future().boxed2();
};
@ -689,7 +718,10 @@ impl Connection {
.boxed2()
}
fn get_unknown_type(self, oid: Oid) -> BoxFuture<(Type, Connection), (Error, Connection)> {
fn get_unknown_type(
self,
oid: Oid,
) -> Box<Future<Item = (Type, Connection), Error = (Error, Connection)> + Send> {
self.setup_typeinfo_query()
.and_then(move |c| c.raw_execute(TYPEINFO_QUERY, "", &[OID], &[&oid]))
.and_then(|c| c.read_rows().collect())
@ -768,7 +800,9 @@ impl Connection {
.boxed2()
}
fn setup_typeinfo_query(self) -> BoxFuture<Connection, (Error, Connection)> {
fn setup_typeinfo_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
if self.0.has_typeinfo_query {
return Ok(self).into_future().boxed2();
}
@ -810,7 +844,7 @@ impl Connection {
fn get_enum_variants(
self,
oid: Oid,
) -> BoxFuture<(Vec<String>, Connection), (Error, Connection)> {
) -> Box<Future<Item = (Vec<String>, Connection), Error = (Error, Connection)> + Send> {
self.setup_typeinfo_enum_query()
.and_then(move |c| {
c.raw_execute(TYPEINFO_ENUM_QUERY, "", &[OID], &[&oid])
@ -830,7 +864,9 @@ impl Connection {
.boxed2()
}
fn setup_typeinfo_enum_query(self) -> BoxFuture<Connection, (Error, Connection)> {
fn setup_typeinfo_enum_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
if self.0.has_typeinfo_enum_query {
return Ok(self).into_future().boxed2();
}
@ -860,39 +896,34 @@ impl Connection {
fn get_composite_fields(
self,
oid: Oid,
) -> BoxFuture<(Vec<Field>, Connection), (Error, Connection)> {
) -> Box<Future<Item = (Vec<Field>, Connection), Error = (Error, Connection)> + Send> {
self.setup_typeinfo_composite_query()
.and_then(move |c| {
c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[OID], &[&oid])
})
.and_then(|c| c.read_rows().collect())
.and_then(|(r, c)| {
futures::stream::iter(r.into_iter().map(Ok)).fold(
(vec![], c),
|(mut fields, c), row| {
let name = match String::from_sql_nullable(&NAME, row.get(0)) {
Ok(name) => name,
Err(e) => {
return Either::A(Err((error::conversion(e), c)).into_future())
}
};
let oid = match Oid::from_sql_nullable(&OID, row.get(1)) {
Ok(oid) => oid,
Err(e) => {
return Either::A(Err((error::conversion(e), c)).into_future())
}
};
Either::B(c.get_type(oid).map(move |(ty, c)| {
fields.push(Field::new(name, ty));
(fields, c)
}))
},
)
futures::stream::iter_ok(r).fold((vec![], c), |(mut fields, c), row| {
let name = match String::from_sql_nullable(&NAME, row.get(0)) {
Ok(name) => name,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
};
let oid = match Oid::from_sql_nullable(&OID, row.get(1)) {
Ok(oid) => oid,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
};
Either::B(c.get_type(oid).map(move |(ty, c)| {
fields.push(Field::new(name, ty));
(fields, c)
}))
})
})
.boxed2()
}
fn setup_typeinfo_composite_query(self) -> BoxFuture<Connection, (Error, Connection)> {
fn setup_typeinfo_composite_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
if self.0.has_typeinfo_composite_query {
return Ok(self).into_future().boxed2();
}
@ -918,7 +949,7 @@ impl Connection {
portal: &str,
param_types: &[Type],
params: &[&ToSql],
) -> BoxFuture<Connection, (Error, Connection)> {
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
assert!(
param_types.len() == params.len(),
"expected {} parameters but got {}",
@ -954,16 +985,12 @@ impl Connection {
Err(e) => Err((error::io(e), s)),
}).into_future()
.and_then(|s| {
let it = Some(bind)
.into_iter()
.chain(Some(execute))
.chain(Some(sync))
.map(Ok::<_, io::Error>);
s.0.send_all2(futures::stream::iter(it)).map_err(
|(e, s, _)| {
(error::io(e), Connection(s))
},
)
let it = Some(bind).into_iter().chain(Some(execute)).chain(
Some(sync),
);
s.0
.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
})
.and_then(|s| {
s.0.read().map_err(|(e, s)| (error::io(e), Connection(s)))
@ -976,7 +1003,9 @@ impl Connection {
.boxed2()
}
fn finish_execute(self) -> BoxFuture<(u64, Connection), (Error, Connection)> {
fn finish_execute(
self,
) -> Box<Future<Item = (u64, Connection), Error = (Error, Connection)> + Send> {
self.0
.read()
.map_err(|(e, s)| (error::io(e), Connection(s)))
@ -1013,7 +1042,9 @@ impl Connection {
}).boxed2()
}
fn read_row(self) -> BoxFuture<(Option<RowData>, Connection), (Error, Connection)> {
fn read_row(
self,
) -> Box<Future<Item = (Option<RowData>, Connection), Error = (Error, Connection)> + Send> {
self.0
.read()
.map_err(|(e, s)| (error::io(e), Connection(s)))
@ -1037,7 +1068,10 @@ impl Connection {
}
/// Creates a new prepared statement.
pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Connection), (Error, Connection)> {
pub fn prepare(
self,
query: &str,
) -> Box<Future<Item = (Statement, Connection), Error = (Error, Connection)> + Send> {
let id = NEXT_STMT_ID.fetch_add(1, Ordering::SeqCst);
let name = format!("s{}", id);
self.raw_prepare(&name, query)
@ -1059,7 +1093,7 @@ impl Connection {
self,
statement: &Statement,
params: &[&ToSql],
) -> BoxFuture<(u64, Connection), (Error, Connection)> {
) -> Box<Future<Item = (u64, Connection), Error = (Error, Connection)> + Send> {
self.raw_execute(statement.name(), "", statement.parameters(), params)
.and_then(|conn| conn.finish_execute())
.boxed2()
@ -1084,7 +1118,9 @@ impl Connection {
}
/// Starts a new transaction.
pub fn transaction(self) -> BoxFuture<Transaction, (Error, Connection)> {
pub fn transaction(
self,
) -> Box<Future<Item = Transaction, Error = (Error, Connection)> + Send> {
self.simple_query("BEGIN")
.map(|(_, c)| Transaction::new(c))
.boxed2()

View File

@ -1,5 +1,5 @@
use bytes::{BytesMut, BufMut};
use futures::{BoxFuture, Future, IntoFuture, Sink, Stream as FuturesStream, Poll};
use futures::{Future, IntoFuture, Sink, Stream as FuturesStream, Poll};
use futures::future::Either;
use postgres_shared::params::Host;
use postgres_protocol::message::backend;
@ -14,7 +14,7 @@ use tokio_dns;
#[cfg(unix)]
use tokio_uds::UnixStream;
use {TlsMode, Error};
use {TlsMode, Error, BoxedFuture};
use error;
use tls::TlsStream;
@ -25,7 +25,7 @@ pub fn connect(
port: u16,
tls_mode: TlsMode,
handle: &Handle,
) -> BoxFuture<PostgresStream, Error> {
) -> Box<Future<Item = PostgresStream, Error = Error> + Send> {
let inner = match host {
Host::Tcp(ref host) => {
Either::A(
@ -65,7 +65,7 @@ pub fn connect(
let s: Box<TlsStream> = Box::new(s);
s.framed(PostgresCodec)
})
.boxed()
.boxed2()
}
};
@ -82,8 +82,7 @@ pub fn connect(
match (m, required) {
(Some(b'N'), true) => {
Either::A(
Err(error::tls("the server does not support TLS".into()))
.into_future(),
Err(error::tls("the server does not support TLS".into())).into_future(),
)
}
(Some(b'N'), false) => {
@ -108,7 +107,7 @@ pub fn connect(
}
})
.map(|s| s.framed(PostgresCodec))
.boxed()
.boxed2()
}
/// A raw connection to the database.

View File

@ -1,6 +1,6 @@
//! TLS support.
use futures::BoxFuture;
use futures::Future;
use std::error::Error;
use tokio_io::{AsyncRead, AsyncWrite};
@ -35,5 +35,5 @@ pub trait Handshake: 'static + Sync + Send {
self: Box<Self>,
host: &str,
stream: Stream,
) -> BoxFuture<Box<TlsStream>, Box<Error + Sync + Send>>;
) -> Box<Future<Item = Box<TlsStream>, Error = Box<Error + Sync + Send>> + Send>;
}

View File

@ -1,6 +1,6 @@
//! Transactions.
use futures::{Future, BoxFuture};
use futures::Future;
use futures_state_stream::StateStream;
use {Connection, BoxedFuture, BoxedStateStream};
@ -19,7 +19,10 @@ impl Transaction {
}
/// Like `Connection::batch_execute`.
pub fn batch_execute(self, query: &str) -> BoxFuture<Transaction, (Error, Transaction)> {
pub fn batch_execute(
self,
query: &str,
) -> Box<Future<Item = Transaction, Error = (Error, Transaction)> + Send> {
self.0
.batch_execute(query)
.map(Transaction)
@ -28,7 +31,10 @@ impl Transaction {
}
/// Like `Connection::prepare`.
pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), (Error, Transaction)> {
pub fn prepare(
self,
query: &str,
) -> Box<Future<Item = (Statement, Transaction), Error = (Error, Transaction)> + Send> {
self.0
.prepare(query)
.map(|(s, c)| (s, Transaction(c)))
@ -41,7 +47,7 @@ impl Transaction {
self,
statement: &Statement,
params: &[&ToSql],
) -> BoxFuture<(u64, Transaction), (Error, Transaction)> {
) -> Box<Future<Item = (u64, Transaction), Error = (Error, Transaction)> + Send> {
self.0
.execute(statement, params)
.map(|(n, c)| (n, Transaction(c)))
@ -62,16 +68,19 @@ impl Transaction {
}
/// Commits the transaction.
pub fn commit(self) -> BoxFuture<Connection, (Error, Connection)> {
pub fn commit(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
self.finish("COMMIT")
}
/// Rolls back the transaction.
pub fn rollback(self) -> BoxFuture<Connection, (Error, Connection)> {
pub fn rollback(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
self.finish("ROLLBACK")
}
fn finish(self, query: &str) -> BoxFuture<Connection, (Error, Connection)> {
fn finish(
self,
query: &str,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
self.0.simple_query(query).map(|(_, c)| c).boxed2()
}
}