commit
3a26c926f2
@ -22,7 +22,7 @@ version: 2
|
|||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
docker:
|
docker:
|
||||||
- image: rust:1.41.0
|
- image: rust:1.45.0
|
||||||
environment:
|
environment:
|
||||||
RUSTFLAGS: -D warnings
|
RUSTFLAGS: -D warnings
|
||||||
- image: sfackler/rust-postgres-test:6
|
- image: sfackler/rust-postgres-test:6
|
||||||
|
@ -136,10 +136,7 @@ impl<'a> DatParser<'a> {
|
|||||||
fn peek(&mut self, target: char) -> bool {
|
fn peek(&mut self, target: char) -> bool {
|
||||||
self.skip_ws();
|
self.skip_ws();
|
||||||
|
|
||||||
match self.it.peek() {
|
matches!(self.it.peek(), Some((_, ch)) if *ch == target)
|
||||||
Some((_, ch)) if *ch == target => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn eof(&mut self) {
|
fn eof(&mut self) {
|
||||||
|
@ -16,13 +16,12 @@ default = ["runtime"]
|
|||||||
runtime = ["tokio-postgres/runtime"]
|
runtime = ["tokio-postgres/runtime"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.5"
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
native-tls = "0.2"
|
native-tls = "0.2"
|
||||||
tokio = "0.2"
|
tokio = "0.3"
|
||||||
tokio-tls = "0.3"
|
tokio-native-tls = "0.2"
|
||||||
tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
|
tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.3", features = ["full"] }
|
||||||
postgres = { version = "0.17.0", path = "../postgres" }
|
postgres = { version = "0.17.0", path = "../postgres" }
|
||||||
|
@ -48,13 +48,11 @@
|
|||||||
#![doc(html_root_url = "https://docs.rs/postgres-native-tls/0.3")]
|
#![doc(html_root_url = "https://docs.rs/postgres-native-tls/0.3")]
|
||||||
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
|
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio_postgres::tls;
|
use tokio_postgres::tls;
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
use tokio_postgres::tls::MakeTlsConnect;
|
use tokio_postgres::tls::MakeTlsConnect;
|
||||||
@ -94,7 +92,7 @@ where
|
|||||||
|
|
||||||
/// A `TlsConnect` implementation using the `native-tls` crate.
|
/// A `TlsConnect` implementation using the `native-tls` crate.
|
||||||
pub struct TlsConnector {
|
pub struct TlsConnector {
|
||||||
connector: tokio_tls::TlsConnector,
|
connector: tokio_native_tls::TlsConnector,
|
||||||
domain: String,
|
domain: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,7 +100,7 @@ impl TlsConnector {
|
|||||||
/// Creates a new connector configured to connect to the specified domain.
|
/// Creates a new connector configured to connect to the specified domain.
|
||||||
pub fn new(connector: native_tls::TlsConnector, domain: &str) -> TlsConnector {
|
pub fn new(connector: native_tls::TlsConnector, domain: &str) -> TlsConnector {
|
||||||
TlsConnector {
|
TlsConnector {
|
||||||
connector: tokio_tls::TlsConnector::from(connector),
|
connector: tokio_native_tls::TlsConnector::from(connector),
|
||||||
domain: domain.to_string(),
|
domain: domain.to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -129,34 +127,19 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The stream returned by `TlsConnector`.
|
/// The stream returned by `TlsConnector`.
|
||||||
pub struct TlsStream<S>(tokio_tls::TlsStream<S>);
|
pub struct TlsStream<S>(tokio_native_tls::TlsStream<S>);
|
||||||
|
|
||||||
impl<S> AsyncRead for TlsStream<S>
|
impl<S> AsyncRead for TlsStream<S>
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
|
||||||
self.0.prepare_uninitialized_buffer(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
Pin::new(&mut self.0).poll_read(cx, buf)
|
Pin::new(&mut self.0).poll_read(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_read_buf<B: BufMut>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Pin::new(&mut self.0).poll_read_buf(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> AsyncWrite for TlsStream<S>
|
impl<S> AsyncWrite for TlsStream<S>
|
||||||
@ -178,17 +161,6 @@ where
|
|||||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Pin::new(&mut self.0).poll_shutdown(cx)
|
Pin::new(&mut self.0).poll_shutdown(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_write_buf<B: Buf>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Pin::new(&mut self.0).poll_write_buf(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> tls::TlsStream for TlsStream<S>
|
impl<S> tls::TlsStream for TlsStream<S>
|
||||||
@ -196,7 +168,9 @@ where
|
|||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
fn channel_binding(&self) -> ChannelBinding {
|
fn channel_binding(&self) -> ChannelBinding {
|
||||||
// FIXME https://github.com/tokio-rs/tokio/issues/1383
|
match self.0.get_ref().tls_server_end_point().ok().flatten() {
|
||||||
ChannelBinding::none()
|
Some(buf) => ChannelBinding::tls_server_end_point(buf),
|
||||||
|
None => ChannelBinding::none(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,13 +16,12 @@ default = ["runtime"]
|
|||||||
runtime = ["tokio-postgres/runtime"]
|
runtime = ["tokio-postgres/runtime"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.5"
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
openssl = "0.10"
|
openssl = "0.10"
|
||||||
tokio = "0.2"
|
tokio = "0.3"
|
||||||
tokio-openssl = "0.4"
|
tokio-openssl = "0.5"
|
||||||
tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
|
tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.3", features = ["full"] }
|
||||||
postgres = { version = "0.17.0", path = "../postgres" }
|
postgres = { version = "0.17.0", path = "../postgres" }
|
||||||
|
@ -42,7 +42,6 @@
|
|||||||
#![doc(html_root_url = "https://docs.rs/postgres-openssl/0.3")]
|
#![doc(html_root_url = "https://docs.rs/postgres-openssl/0.3")]
|
||||||
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
|
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
use openssl::error::ErrorStack;
|
use openssl::error::ErrorStack;
|
||||||
use openssl::hash::MessageDigest;
|
use openssl::hash::MessageDigest;
|
||||||
@ -53,12 +52,11 @@ use openssl::ssl::{ConnectConfiguration, SslRef};
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio_openssl::{HandshakeError, SslStream};
|
use tokio_openssl::{HandshakeError, SslStream};
|
||||||
use tokio_postgres::tls;
|
use tokio_postgres::tls;
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
@ -157,28 +155,13 @@ impl<S> AsyncRead for TlsStream<S>
|
|||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
|
||||||
self.0.prepare_uninitialized_buffer(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
Pin::new(&mut self.0).poll_read(cx, buf)
|
Pin::new(&mut self.0).poll_read(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_read_buf<B: BufMut>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Pin::new(&mut self.0).poll_read_buf(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> AsyncWrite for TlsStream<S>
|
impl<S> AsyncWrite for TlsStream<S>
|
||||||
@ -200,17 +183,6 @@ where
|
|||||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Pin::new(&mut self.0).poll_shutdown(cx)
|
Pin::new(&mut self.0).poll_shutdown(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_write_buf<B: Buf>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
{
|
|
||||||
Pin::new(&mut self.0).poll_write_buf(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> tls::TlsStream for TlsStream<S>
|
impl<S> tls::TlsStream for TlsStream<S>
|
||||||
|
@ -330,10 +330,7 @@ impl<'a> Parser<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn printable(&mut self) -> io::Result<&'a str> {
|
fn printable(&mut self) -> io::Result<&'a str> {
|
||||||
self.take_while(|c| match c {
|
self.take_while(|c| matches!(c, '\x21'..='\x2b' | '\x2d'..='\x7e'))
|
||||||
'\x21'..='\x2b' | '\x2d'..='\x7e' => true,
|
|
||||||
_ => false,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nonce(&mut self) -> io::Result<&'a str> {
|
fn nonce(&mut self) -> io::Result<&'a str> {
|
||||||
@ -343,10 +340,7 @@ impl<'a> Parser<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn base64(&mut self) -> io::Result<&'a str> {
|
fn base64(&mut self) -> io::Result<&'a str> {
|
||||||
self.take_while(|c| match c {
|
self.take_while(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '/' | '+' | '='))
|
||||||
'a'..='z' | 'A'..='Z' | '0'..='9' | '/' | '+' | '=' => true,
|
|
||||||
_ => false,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn salt(&mut self) -> io::Result<&'a str> {
|
fn salt(&mut self) -> io::Result<&'a str> {
|
||||||
@ -356,10 +350,7 @@ impl<'a> Parser<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn posit_number(&mut self) -> io::Result<u32> {
|
fn posit_number(&mut self) -> io::Result<u32> {
|
||||||
let n = self.take_while(|c| match c {
|
let n = self.take_while(|c| matches!(c, '0'..='9'))?;
|
||||||
'0'..='9' => true,
|
|
||||||
_ => false,
|
|
||||||
})?;
|
|
||||||
n.parse()
|
n.parse()
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
|
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
|
||||||
}
|
}
|
||||||
@ -396,10 +387,7 @@ impl<'a> Parser<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn value(&mut self) -> io::Result<&'a str> {
|
fn value(&mut self) -> io::Result<&'a str> {
|
||||||
self.take_while(|c| match c {
|
self.take_while(|c| matches!(c, '\0' | '=' | ','))
|
||||||
'\0' | '=' | ',' => false,
|
|
||||||
_ => true,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn server_error(&mut self) -> io::Result<Option<&'a str>> {
|
fn server_error(&mut self) -> io::Result<Option<&'a str>> {
|
||||||
|
@ -144,10 +144,7 @@ const NSEC_PER_USEC: u64 = 1_000;
|
|||||||
macro_rules! accepts {
|
macro_rules! accepts {
|
||||||
($($expected:ident),+) => (
|
($($expected:ident),+) => (
|
||||||
fn accepts(ty: &$crate::Type) -> bool {
|
fn accepts(ty: &$crate::Type) -> bool {
|
||||||
match *ty {
|
matches!(*ty, $($crate::Type::$expected)|+)
|
||||||
$($crate::Type::$expected)|+ => true,
|
|
||||||
_ => false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -75,10 +75,7 @@ impl<'a, T: FromSql<'a>> FromSql<'a> for Timestamp<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn accepts(ty: &Type) -> bool {
|
fn accepts(ty: &Type) -> bool {
|
||||||
match *ty {
|
matches!(*ty, Type::TIMESTAMP | Type::TIMESTAMPTZ if T::accepts(ty))
|
||||||
Type::TIMESTAMP | Type::TIMESTAMPTZ if T::accepts(ty) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,10 +96,7 @@ impl<T: ToSql> ToSql for Timestamp<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn accepts(ty: &Type) -> bool {
|
fn accepts(ty: &Type) -> bool {
|
||||||
match *ty {
|
matches!(*ty, Type::TIMESTAMP | Type::TIMESTAMPTZ if T::accepts(ty))
|
||||||
Type::TIMESTAMP | Type::TIMESTAMPTZ if T::accepts(ty) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
to_sql_checked!();
|
to_sql_checked!();
|
||||||
|
@ -36,7 +36,7 @@ fallible-iterator = "0.2"
|
|||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio-postgres = { version = "0.5.5", path = "../tokio-postgres" }
|
tokio-postgres = { version = "0.5.5", path = "../tokio-postgres" }
|
||||||
|
|
||||||
tokio = { version = "0.2", features = ["rt-core", "time"] }
|
tokio = { version = "0.3", features = ["rt", "time"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -26,9 +26,8 @@ impl CancelToken {
|
|||||||
where
|
where
|
||||||
T: MakeTlsConnect<Socket>,
|
T: MakeTlsConnect<Socket>,
|
||||||
{
|
{
|
||||||
runtime::Builder::new()
|
runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.basic_scheduler()
|
|
||||||
.build()
|
.build()
|
||||||
.unwrap() // FIXME don't unwrap
|
.unwrap() // FIXME don't unwrap
|
||||||
.block_on(self.0.cancel_query(tls))
|
.block_on(self.0.cancel_query(tls))
|
||||||
|
@ -336,9 +336,8 @@ impl Config {
|
|||||||
T::Stream: Send,
|
T::Stream: Send,
|
||||||
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
let mut runtime = runtime::Builder::new()
|
let runtime = runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.basic_scheduler()
|
|
||||||
.build()
|
.build()
|
||||||
.unwrap(); // FIXME don't unwrap
|
.unwrap(); // FIXME don't unwrap
|
||||||
|
|
||||||
|
@ -45,7 +45,8 @@ impl Connection {
|
|||||||
where
|
where
|
||||||
F: FnOnce() -> T,
|
F: FnOnce() -> T,
|
||||||
{
|
{
|
||||||
self.runtime.enter(f)
|
let _guard = self.runtime.enter();
|
||||||
|
f()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn block_on<F, T>(&mut self, future: F) -> Result<T, Error>
|
pub fn block_on<F, T>(&mut self, future: F) -> Result<T, Error>
|
||||||
|
@ -6,7 +6,7 @@ use fallible_iterator::FallibleIterator;
|
|||||||
use futures::{ready, FutureExt};
|
use futures::{ready, FutureExt};
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::time::{self, Delay, Instant};
|
use tokio::time::{self, Instant, Sleep};
|
||||||
|
|
||||||
/// Notifications from a PostgreSQL backend.
|
/// Notifications from a PostgreSQL backend.
|
||||||
pub struct Notifications<'a> {
|
pub struct Notifications<'a> {
|
||||||
@ -64,7 +64,7 @@ impl<'a> Notifications<'a> {
|
|||||||
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
|
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
|
||||||
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
|
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
|
||||||
TimeoutIter {
|
TimeoutIter {
|
||||||
delay: self.connection.enter(|| time::delay_for(timeout)),
|
delay: self.connection.enter(|| time::sleep(timeout)),
|
||||||
timeout,
|
timeout,
|
||||||
connection: self.connection.as_ref(),
|
connection: self.connection.as_ref(),
|
||||||
}
|
}
|
||||||
@ -124,7 +124,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
|
|||||||
/// A time-limited blocking iterator over pending notifications.
|
/// A time-limited blocking iterator over pending notifications.
|
||||||
pub struct TimeoutIter<'a> {
|
pub struct TimeoutIter<'a> {
|
||||||
connection: ConnectionRef<'a>,
|
connection: ConnectionRef<'a>,
|
||||||
delay: Delay,
|
delay: Sleep,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ circle-ci = { repository = "sfackler/rust-postgres" }
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["runtime"]
|
default = ["runtime"]
|
||||||
runtime = ["tokio/dns", "tokio/net", "tokio/time"]
|
runtime = ["tokio/net", "tokio/time"]
|
||||||
|
|
||||||
with-bit-vec-0_6 = ["postgres-types/with-bit-vec-0_6"]
|
with-bit-vec-0_6 = ["postgres-types/with-bit-vec-0_6"]
|
||||||
with-chrono-0_4 = ["postgres-types/with-chrono-0_4"]
|
with-chrono-0_4 = ["postgres-types/with-chrono-0_4"]
|
||||||
@ -49,11 +49,11 @@ pin-project-lite = "0.1"
|
|||||||
phf = "0.8"
|
phf = "0.8"
|
||||||
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
|
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
|
||||||
postgres-types = { version = "0.1.2", path = "../postgres-types" }
|
postgres-types = { version = "0.1.2", path = "../postgres-types" }
|
||||||
tokio = { version = "0.2", features = ["io-util"] }
|
tokio = { version = "0.3", features = ["io-util"] }
|
||||||
tokio-util = { version = "0.3", features = ["codec"] }
|
tokio-util = { version = "0.4", features = ["codec"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.3", features = ["full"] }
|
||||||
env_logger = "0.7"
|
env_logger = "0.7"
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ use tokio::runtime::Runtime;
|
|||||||
use tokio_postgres::{Client, NoTls};
|
use tokio_postgres::{Client, NoTls};
|
||||||
|
|
||||||
fn setup() -> (Client, Runtime) {
|
fn setup() -> (Client, Runtime) {
|
||||||
let mut runtime = Runtime::new().unwrap();
|
let runtime = Runtime::new().unwrap();
|
||||||
let (client, conn) = runtime
|
let (client, conn) = runtime
|
||||||
.block_on(tokio_postgres::connect(
|
.block_on(tokio_postgres::connect(
|
||||||
"host=localhost port=5433 user=postgres",
|
"host=localhost port=5433 user=postgres",
|
||||||
@ -19,7 +19,7 @@ fn setup() -> (Client, Runtime) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn query_prepared(c: &mut Criterion) {
|
fn query_prepared(c: &mut Criterion) {
|
||||||
let (client, mut runtime) = setup();
|
let (client, runtime) = setup();
|
||||||
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
||||||
c.bench_function("runtime_block_on", move |b| {
|
c.bench_function("runtime_block_on", move |b| {
|
||||||
b.iter(|| {
|
b.iter(|| {
|
||||||
@ -29,13 +29,13 @@ fn query_prepared(c: &mut Criterion) {
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let (client, mut runtime) = setup();
|
let (client, runtime) = setup();
|
||||||
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
||||||
c.bench_function("executor_block_on", move |b| {
|
c.bench_function("executor_block_on", move |b| {
|
||||||
b.iter(|| executor::block_on(client.query(&statement, &[&1i64])).unwrap())
|
b.iter(|| executor::block_on(client.query(&statement, &[&1i64])).unwrap())
|
||||||
});
|
});
|
||||||
|
|
||||||
let (client, mut runtime) = setup();
|
let (client, runtime) = setup();
|
||||||
let client = Arc::new(client);
|
let client = Arc::new(client);
|
||||||
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
let statement = runtime.block_on(client.prepare("SELECT $1::INT8")).unwrap();
|
||||||
c.bench_function("spawned", move |b| {
|
c.bench_function("spawned", move |b| {
|
||||||
|
@ -12,19 +12,15 @@ pub(crate) async fn connect_socket(
|
|||||||
host: &Host,
|
host: &Host,
|
||||||
port: u16,
|
port: u16,
|
||||||
connect_timeout: Option<Duration>,
|
connect_timeout: Option<Duration>,
|
||||||
keepalives: bool,
|
_keepalives: bool,
|
||||||
keepalives_idle: Duration,
|
_keepalives_idle: Duration,
|
||||||
) -> Result<Socket, Error> {
|
) -> Result<Socket, Error> {
|
||||||
match host {
|
match host {
|
||||||
Host::Tcp(host) => {
|
Host::Tcp(host) => {
|
||||||
let socket =
|
let socket =
|
||||||
connect_with_timeout(TcpStream::connect((&**host, port)), connect_timeout).await?;
|
connect_with_timeout(TcpStream::connect((&**host, port)), connect_timeout).await?;
|
||||||
socket.set_nodelay(true).map_err(Error::connect)?;
|
socket.set_nodelay(true).map_err(Error::connect)?;
|
||||||
if keepalives {
|
// FIXME support keepalives?
|
||||||
socket
|
|
||||||
.set_keepalive(Some(keepalives_idle))
|
|
||||||
.map_err(Error::connect)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Socket::new_tcp(socket))
|
Ok(Socket::new_tcp(socket))
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,8 @@
|
|||||||
use crate::tls::{ChannelBinding, TlsStream};
|
use crate::tls::{ChannelBinding, TlsStream};
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
|
||||||
pub enum MaybeTlsStream<S, T> {
|
pub enum MaybeTlsStream<S, T> {
|
||||||
Raw(S),
|
Raw(S),
|
||||||
@ -16,38 +14,16 @@ where
|
|||||||
S: AsyncRead + Unpin,
|
S: AsyncRead + Unpin,
|
||||||
T: AsyncRead + Unpin,
|
T: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
|
||||||
match self {
|
|
||||||
MaybeTlsStream::Raw(s) => s.prepare_uninitialized_buffer(buf),
|
|
||||||
MaybeTlsStream::Tls(s) => s.prepare_uninitialized_buffer(buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
match &mut *self {
|
match &mut *self {
|
||||||
MaybeTlsStream::Raw(s) => Pin::new(s).poll_read(cx, buf),
|
MaybeTlsStream::Raw(s) => Pin::new(s).poll_read(cx, buf),
|
||||||
MaybeTlsStream::Tls(s) => Pin::new(s).poll_read(cx, buf),
|
MaybeTlsStream::Tls(s) => Pin::new(s).poll_read(cx, buf),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_read_buf<B>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
B: BufMut,
|
|
||||||
{
|
|
||||||
match &mut *self {
|
|
||||||
MaybeTlsStream::Raw(s) => Pin::new(s).poll_read_buf(cx, buf),
|
|
||||||
MaybeTlsStream::Tls(s) => Pin::new(s).poll_read_buf(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, T> AsyncWrite for MaybeTlsStream<S, T>
|
impl<S, T> AsyncWrite for MaybeTlsStream<S, T>
|
||||||
@ -79,21 +55,6 @@ where
|
|||||||
MaybeTlsStream::Tls(s) => Pin::new(s).poll_shutdown(cx),
|
MaybeTlsStream::Tls(s) => Pin::new(s).poll_shutdown(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_write_buf<B>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
B: Buf,
|
|
||||||
{
|
|
||||||
match &mut *self {
|
|
||||||
MaybeTlsStream::Raw(s) => Pin::new(s).poll_write_buf(cx, buf),
|
|
||||||
MaybeTlsStream::Tls(s) => Pin::new(s).poll_write_buf(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, T> TlsStream for MaybeTlsStream<S, T>
|
impl<S, T> TlsStream for MaybeTlsStream<S, T>
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
use bytes::{Buf, BufMut};
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
@ -33,41 +31,17 @@ impl Socket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncRead for Socket {
|
impl AsyncRead for Socket {
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
|
||||||
match &self.0 {
|
|
||||||
Inner::Tcp(s) => s.prepare_uninitialized_buffer(buf),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Inner::Unix(s) => s.prepare_uninitialized_buffer(buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut [u8],
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
match &mut self.0 {
|
match &mut self.0 {
|
||||||
Inner::Tcp(s) => Pin::new(s).poll_read(cx, buf),
|
Inner::Tcp(s) => Pin::new(s).poll_read(cx, buf),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
Inner::Unix(s) => Pin::new(s).poll_read(cx, buf),
|
Inner::Unix(s) => Pin::new(s).poll_read(cx, buf),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_read_buf<B>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
B: BufMut,
|
|
||||||
{
|
|
||||||
match &mut self.0 {
|
|
||||||
Inner::Tcp(s) => Pin::new(s).poll_read_buf(cx, buf),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Inner::Unix(s) => Pin::new(s).poll_read_buf(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncWrite for Socket {
|
impl AsyncWrite for Socket {
|
||||||
@ -98,20 +72,4 @@ impl AsyncWrite for Socket {
|
|||||||
Inner::Unix(s) => Pin::new(s).poll_shutdown(cx),
|
Inner::Unix(s) => Pin::new(s).poll_shutdown(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_write_buf<B>(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<io::Result<usize>>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
B: Buf,
|
|
||||||
{
|
|
||||||
match &mut self.0 {
|
|
||||||
Inner::Tcp(s) => Pin::new(s).poll_write_buf(cx, buf),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Inner::Unix(s) => Pin::new(s).poll_write_buf(cx, buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ use std::future::Future;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
|
||||||
pub(crate) mod private {
|
pub(crate) mod private {
|
||||||
pub struct ForcePrivateApi;
|
pub struct ForcePrivateApi;
|
||||||
@ -125,8 +125,8 @@ impl AsyncRead for NoTlsStream {
|
|||||||
fn poll_read(
|
fn poll_read(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_: &mut Context<'_>,
|
_: &mut Context<'_>,
|
||||||
_: &mut [u8],
|
_: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<()>> {
|
||||||
match *self {}
|
match *self {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -308,7 +308,7 @@ async fn cancel_query_raw() {
|
|||||||
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
|
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
|
||||||
let cancel_token = client.cancel_token();
|
let cancel_token = client.cancel_token();
|
||||||
let cancel = cancel_token.cancel_query_raw(socket, NoTls);
|
let cancel = cancel_token.cancel_query_raw(socket, NoTls);
|
||||||
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
|
let cancel = time::sleep(Duration::from_millis(100)).then(|()| cancel);
|
||||||
|
|
||||||
let sleep = client.batch_execute("SELECT pg_sleep(100)");
|
let sleep = client.batch_execute("SELECT pg_sleep(100)");
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ async fn cancel_query() {
|
|||||||
|
|
||||||
let cancel_token = client.cancel_token();
|
let cancel_token = client.cancel_token();
|
||||||
let cancel = cancel_token.cancel_query(NoTls);
|
let cancel = cancel_token.cancel_query(NoTls);
|
||||||
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
|
let cancel = time::sleep(Duration::from_millis(100)).then(|()| cancel);
|
||||||
|
|
||||||
let sleep = client.batch_execute("SELECT pg_sleep(100)");
|
let sleep = client.batch_execute("SELECT pg_sleep(100)");
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user