deps: upgrade to tokio v1.0 ecosystem

This commit is contained in:
Nikhil Benesch 2020-12-23 21:03:15 -05:00
parent 07aa69febf
commit f1729e4636
12 changed files with 63 additions and 30 deletions

View File

@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
native-tls = "0.2"
tokio = "0.3"
tokio-native-tls = "0.2"
tokio = "1.0"
tokio-native-tls = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }
[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }

View File

@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
openssl = "0.10"
tokio = "0.3"
tokio-openssl = "0.5"
tokio = "1.0"
tokio-openssl = "0.6"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }
[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }

View File

@ -48,8 +48,10 @@ use openssl::hash::MessageDigest;
use openssl::nid::Nid;
#[cfg(feature = "runtime")]
use openssl::ssl::SslConnector;
use openssl::ssl::{ConnectConfiguration, SslRef};
use std::fmt::Debug;
use openssl::ssl::{self, ConnectConfiguration, SslRef};
use openssl::x509::X509VerifyResult;
use std::error::Error;
use std::fmt::{self, Debug};
use std::future::Future;
use std::io;
use std::pin::Pin;
@ -57,7 +59,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_openssl::{HandshakeError, SslStream};
use tokio_openssl::SslStream;
use tokio_postgres::tls;
#[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect;
@ -131,23 +133,55 @@ impl TlsConnector {
impl<S> TlsConnect<S> for TlsConnector
where
S: AsyncRead + AsyncWrite + Unpin + Debug + 'static + Sync + Send,
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = TlsStream<S>;
type Error = HandshakeError<S>;
type Error = Box<dyn Error + Send + Sync>;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, HandshakeError<S>>> + Send>>;
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, Self::Error>> + Send>>;
fn connect(self, stream: S) -> Self::Future {
let future = async move {
let stream = tokio_openssl::connect(self.ssl, &self.domain, stream).await?;
Ok(TlsStream(stream))
let ssl = self.ssl.into_ssl(&self.domain)?;
let mut stream = SslStream::new(ssl, stream)?;
match Pin::new(&mut stream).connect().await {
Ok(()) => Ok(TlsStream(stream)),
Err(error) => Err(Box::new(ConnectError {
error,
verify_result: stream.ssl().verify_result(),
}) as _),
}
};
Box::pin(future)
}
}
#[derive(Debug)]
struct ConnectError {
error: ssl::Error,
verify_result: X509VerifyResult,
}
impl fmt::Display for ConnectError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.error, fmt)?;
if self.verify_result != X509VerifyResult::OK {
fmt.write_str(": ")?;
fmt::Display::fmt(&self.verify_result, fmt)?;
}
Ok(())
}
}
impl Error for ConnectError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.error)
}
}
/// The stream returned by `TlsConnector`.
pub struct TlsStream<S>(SslStream<S>);

View File

@ -11,7 +11,7 @@ readme = "../README.md"
[dependencies]
base64 = "0.13"
byteorder = "1.0"
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
hmac = "0.10"
md5 = "0.7"

View File

@ -22,7 +22,7 @@ with-uuid-0_8 = ["uuid-08"]
with-time-0_2 = ["time-02"]
[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-derive = { version = "0.4.0", optional = true, path = "../postgres-derive" }

View File

@ -1,5 +1,4 @@
use crate::{FromSql, IsNull, ToSql, Type};
use bytes::buf::BufMutExt;
use bytes::{BufMut, BytesMut};
use serde_1::{Deserialize, Serialize};
use serde_json_1::Value;

View File

@ -31,12 +31,12 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
with-time-0_2 = ["tokio-postgres/with-time-0_2"]
[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres" }
tokio = { version = "0.3", features = ["rt", "time"] }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4"
[dev-dependencies]

View File

@ -46,7 +46,7 @@ impl BufRead for CopyOutReader<'_> {
};
}
Ok(self.cur.bytes())
Ok(&self.cur)
}
fn consume(&mut self, amt: usize) {

View File

@ -4,6 +4,7 @@ use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures::{ready, FutureExt};
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};
@ -64,7 +65,7 @@ impl<'a> Notifications<'a> {
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
delay: self.connection.enter(|| time::sleep(timeout)),
delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
timeout,
connection: self.connection.as_ref(),
}
@ -124,7 +125,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
/// A time-limited blocking iterator over pending notifications.
pub struct TimeoutIter<'a> {
connection: ConnectionRef<'a>,
delay: Sleep,
delay: Pin<Box<Sleep>>,
timeout: Duration,
}
@ -134,7 +135,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
self.delay.reset(Instant::now() + self.timeout);
self.delay.as_mut().reset(Instant::now() + self.timeout);
return Ok(Some(notification));
}
@ -143,7 +144,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
self.connection.poll_block_on(|cx, notifications, done| {
match notifications.pop_front() {
Some(notification) => {
delay.reset(Instant::now() + timeout);
delay.as_mut().reset(Instant::now() + timeout);
return Poll::Ready(Ok(Some(notification)));
}
None if done => return Poll::Ready(Ok(None)),

View File

@ -38,7 +38,7 @@ with-time-0_2 = ["postgres-types/with-time-0_2"]
[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
byteorder = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
@ -50,11 +50,11 @@ phf = "0.8"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "0.1.2", path = "../postgres-types" }
socket2 = "0.3"
tokio = { version = "0.3", features = ["io-util"] }
tokio-util = { version = "0.4", features = ["codec"] }
tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }
[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
env_logger = "0.8"
criterion = "0.3"

View File

@ -153,7 +153,7 @@ impl Stream for BinaryCopyOutStream {
Some(header) => header.has_oids,
None => {
check_remaining(&chunk, HEADER_LEN)?;
if &chunk.bytes()[..MAGIC.len()] != MAGIC {
if !chunk.chunk().starts_with(MAGIC) {
return Poll::Ready(Some(Err(Error::parse(io::Error::new(
io::ErrorKind::InvalidData,
"invalid magic value",

View File

@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::buf::BufExt;
use bytes::{Buf, BufMut, BytesMut};
use futures::channel::mpsc;
use futures::future;