Upgrade to tokio alpha.4

This commit is contained in:
Steven Fackler 2019-08-29 18:10:58 -07:00
parent c026644820
commit 2a2b76d1b8
9 changed files with 29 additions and 83 deletions

View File

@ -2,7 +2,7 @@
members = [ members = [
"codegen", "codegen",
"postgres", "postgres",
"postgres-native-tls", # "postgres-native-tls",
"postgres-openssl", "postgres-openssl",
"postgres-protocol", "postgres-protocol",
"tokio-postgres", "tokio-postgres",

View File

@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies] [dependencies]
futures-preview = "=0.3.0-alpha.18" futures-preview = "=0.3.0-alpha.18"
native-tls = "0.2" native-tls = "0.2"
tokio-io = "=0.2.0-alpha.2" tokio-io = "=0.2.0-alpha.4"
tokio-tls = "=0.3.0-alpha.2" tokio-tls = "=0.3.0-alpha.4"
tokio-postgres = { version = "0.4.0-rc.1", path = "../tokio-postgres", default-features = false } tokio-postgres = { version = "0.4.0-rc.1", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = "=0.2.0-alpha.2" tokio = "=0.2.0-alpha.4"
postgres = { version = "0.16.0-rc.1", path = "../postgres" } postgres = { version = "0.16.0-rc.1", path = "../postgres" }

View File

@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies] [dependencies]
futures-preview = "=0.3.0-alpha.18" futures-preview = "=0.3.0-alpha.18"
openssl = "0.10" openssl = "0.10"
tokio-io = "=0.2.0-alpha.2" tokio-io = "=0.2.0-alpha.4"
tokio-openssl = "=0.4.0-alpha.2" tokio-openssl = "=0.4.0-alpha.4"
tokio-postgres = { version = "0.4.0-rc.1", path = "../tokio-postgres", default-features = false } tokio-postgres = { version = "0.4.0-rc.1", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = "=0.2.0-alpha.2" tokio = "=0.2.0-alpha.4"
postgres = { version = "0.16.0-rc.1", path = "../postgres" } postgres = { version = "0.16.0-rc.1", path = "../postgres" }

View File

@ -10,9 +10,7 @@ where
T: TlsConnect<TcpStream>, T: TlsConnect<TcpStream>,
T::Stream: 'static + Send, T::Stream: 'static + Send,
{ {
let stream = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) let stream = TcpStream::connect("127.0.0.1:5433").await.unwrap();
.await
.unwrap();
let builder = s.parse::<tokio_postgres::Config>().unwrap(); let builder = s.parse::<tokio_postgres::Config>().unwrap();
let (mut client, connection) = builder.connect_raw(stream, tls).await.unwrap(); let (mut client, connection) = builder.connect_raw(stream, tls).await.unwrap();

View File

@ -33,11 +33,11 @@ fallible-iterator = "0.2"
futures-preview = "=0.3.0-alpha.18" futures-preview = "=0.3.0-alpha.18"
pin-utils = "=0.1.0-alpha.4" pin-utils = "=0.1.0-alpha.4"
tokio-postgres = { version = "0.4.0-rc.2", path = "../tokio-postgres", default-features = false } tokio-postgres = { version = "0.4.0-rc.2", path = "../tokio-postgres", default-features = false }
tokio-executor = "=0.2.0-alpha.2" tokio-executor = "=0.2.0-alpha.4"
tokio = { version = "=0.2.0-alpha.2", optional = true } tokio = { version = "=0.2.0-alpha.4", optional = true }
lazy_static = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true }
log = { version = "0.4", optional = true } log = { version = "0.4", optional = true }
[dev-dependencies] [dev-dependencies]
tokio = "=0.2.0-alpha.2" tokio = "=0.2.0-alpha.4"

View File

@ -40,9 +40,9 @@ percent-encoding = "1.0"
pin-utils = "=0.1.0-alpha.4" pin-utils = "=0.1.0-alpha.4"
phf = "0.7.23" phf = "0.7.23"
postgres-protocol = { version = "0.4.1", path = "../postgres-protocol" } postgres-protocol = { version = "0.4.1", path = "../postgres-protocol" }
tokio = { version = "=0.2.0-alpha.2", default-features = false, features = ["io", "codec"] } tokio = { version = "=0.2.0-alpha.4", default-features = false, features = ["io", "codec"] }
tokio-executor = { version = "=0.2.0-alpha.2", optional = true } tokio-executor = { version = "=0.2.0-alpha.4", optional = true }
lazy_static = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true }
bit-vec-05 = { version = "0.5", package = "bit-vec", optional = true } bit-vec-05 = { version = "0.5", package = "bit-vec", optional = true }
@ -54,5 +54,5 @@ serde_json-1 = { version = "1.0", package = "serde_json", optional = true }
uuid-07 = { version = "0.7", package = "uuid", optional = true } uuid-07 = { version = "0.7", package = "uuid", optional = true }
[dev-dependencies] [dev-dependencies]
tokio = "=0.2.0-alpha.2" tokio = "=0.2.0-alpha.4"
env_logger = "0.5" env_logger = "0.5"

View File

@ -1,17 +1,12 @@
use crate::config::Host; use crate::config::Host;
use crate::{Error, Socket}; use crate::{Error, Socket};
use futures::channel::oneshot;
use futures::future;
use std::future::Future; use std::future::Future;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; use std::io;
use std::time::Duration; use std::time::Duration;
use std::vec;
use std::{io, thread};
use tokio::net::TcpStream; use tokio::net::TcpStream;
#[cfg(unix)] #[cfg(unix)]
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::timer::Timeout; use tokio::timer::Timeout;
use tokio_executor::threadpool;
pub(crate) async fn connect_socket( pub(crate) async fn connect_socket(
host: &Host, host: &Host,
@ -22,40 +17,16 @@ pub(crate) async fn connect_socket(
) -> Result<Socket, Error> { ) -> Result<Socket, Error> {
match host { match host {
Host::Tcp(host) => { Host::Tcp(host) => {
let addrs = match host.parse::<IpAddr>() { let socket =
Ok(ip) => { connect_with_timeout(TcpStream::connect((&**host, port)), connect_timeout).await?;
// avoid dealing with blocking DNS entirely if possible socket.set_nodelay(true).map_err(Error::connect)?;
vec![SocketAddr::new(ip, port)].into_iter() if keepalives {
} socket
Err(_) => dns(host, port).await.map_err(Error::connect)?, .set_keepalive(Some(keepalives_idle))
}; .map_err(Error::connect)?;
let mut error = None;
for addr in addrs {
let new_error =
match connect_with_timeout(TcpStream::connect(&addr), connect_timeout).await {
Ok(socket) => {
socket.set_nodelay(true).map_err(Error::connect)?;
if keepalives {
socket
.set_keepalive(Some(keepalives_idle))
.map_err(Error::connect)?;
}
return Ok(Socket::new_tcp(socket));
}
Err(e) => e,
};
error = Some(new_error);
} }
let error = error.unwrap_or_else(|| { Ok(Socket::new_tcp(socket))
Error::connect(io::Error::new(
io::ErrorKind::InvalidData,
"resolved 0 addresses",
))
});
Err(error)
} }
#[cfg(unix)] #[cfg(unix)]
Host::Unix(path) => { Host::Unix(path) => {
@ -65,25 +36,6 @@ pub(crate) async fn connect_socket(
} }
} }
async fn dns(host: &str, port: u16) -> io::Result<vec::IntoIter<SocketAddr>> {
// if we're running on a threadpool, use its blocking support
if let Ok(r) =
future::poll_fn(|_| threadpool::blocking(|| (host, port).to_socket_addrs())).await
{
return r;
}
// FIXME what should we do here?
let (tx, rx) = oneshot::channel();
let host = host.to_string();
thread::spawn(move || {
let addrs = (&*host, port).to_socket_addrs();
let _ = tx.send(addrs);
});
rx.await.unwrap()
}
async fn connect_with_timeout<F, T>(connect: F, timeout: Option<Duration>) -> Result<T, Error> async fn connect_with_timeout<F, T>(connect: F, timeout: Option<Duration>) -> Result<T, Error>
where where
F: Future<Output = io::Result<T>>, F: Future<Output = io::Result<T>>,

View File

@ -6,7 +6,7 @@ use futures::{join, try_join, FutureExt, TryStreamExt};
use std::fmt::Write; use std::fmt::Write;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::timer::Delay; use tokio::timer;
use tokio_postgres::error::SqlState; use tokio_postgres::error::SqlState;
use tokio_postgres::tls::{NoTls, NoTlsStream}; use tokio_postgres::tls::{NoTls, NoTlsStream};
use tokio_postgres::types::{Kind, Type}; use tokio_postgres::types::{Kind, Type};
@ -18,9 +18,7 @@ mod runtime;
mod types; mod types;
async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> { async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
let socket = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
.await
.unwrap();
let config = s.parse::<Config>().unwrap(); let config = s.parse::<Config>().unwrap();
config.connect_raw(socket, NoTls).await config.connect_raw(socket, NoTls).await
} }
@ -303,11 +301,9 @@ async fn simple_query() {
async fn cancel_query_raw() { async fn cancel_query_raw() {
let mut client = connect("user=postgres").await; let mut client = connect("user=postgres").await;
let socket = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
.await
.unwrap();
let cancel = client.cancel_query_raw(socket, NoTls); let cancel = client.cancel_query_raw(socket, NoTls);
let cancel = Delay::new(Instant::now() + Duration::from_millis(100)).then(|()| cancel); let cancel = timer::delay(Instant::now() + Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)"); let sleep = client.batch_execute("SELECT pg_sleep(100)");

View File

@ -1,6 +1,6 @@
use futures::{join, FutureExt, TryStreamExt}; use futures::{join, FutureExt, TryStreamExt};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::timer::Delay; use tokio::timer;
use tokio_postgres::error::SqlState; use tokio_postgres::error::SqlState;
use tokio_postgres::{Client, NoTls}; use tokio_postgres::{Client, NoTls};
@ -75,7 +75,7 @@ async fn cancel_query() {
let mut client = connect("host=localhost port=5433 user=postgres").await; let mut client = connect("host=localhost port=5433 user=postgres").await;
let cancel = client.cancel_query(NoTls); let cancel = client.cancel_query(NoTls);
let cancel = Delay::new(Instant::now() + Duration::from_millis(100)).then(|()| cancel); let cancel = timer::delay(Instant::now() + Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)"); let sleep = client.batch_execute("SELECT pg_sleep(100)");