Upgrade to tokio 0.2/futures 0.3

This commit is contained in:
Steven Fackler 2019-11-26 16:32:36 -08:00
parent 5517719b07
commit 12c2ef79b6
39 changed files with 210 additions and 258 deletions

View File

@ -16,12 +16,13 @@ default = ["runtime"]
runtime = ["tokio-postgres/runtime"]
[dependencies]
bytes = "0.5"
futures-preview = "=0.3.0-alpha.19"
native-tls = "0.2"
tokio-io = "=0.2.0-alpha.6"
tokio-tls = "=0.3.0-alpha.6"
tokio = "0.2"
tokio-tls = "0.3"
tokio-postgres = { version = "=0.5.0-alpha.1", path = "../tokio-postgres", default-features = false }
[dev-dependencies]
tokio = "=0.2.0-alpha.6"
tokio = { version = "0.2", features = ["full"] }
postgres = { version = "=0.17.0-alpha.1", path = "../postgres" }

View File

@ -48,16 +48,17 @@
#![doc(html_root_url = "https://docs.rs/postgres-native-tls/0.3")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
use futures::task::Context;
use futures::Poll;
use std::task::{Context, Poll};
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio_io::{AsyncRead, AsyncWrite, Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
use bytes::{Buf, BufMut};
use tokio_postgres::tls;
#[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::tls::{ChannelBinding, TlsConnect};
use std::mem::MaybeUninit;
#[cfg(test)]
mod test;
@ -134,7 +135,7 @@ impl<S> AsyncRead for TlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}

View File

@ -16,12 +16,13 @@ default = ["runtime"]
runtime = ["tokio-postgres/runtime"]
[dependencies]
futures-preview = "=0.3.0-alpha.19"
bytes = "0.5"
futures = "0.3"
openssl = "0.10"
tokio-io = "=0.2.0-alpha.6"
tokio-openssl = "=0.4.0-alpha.6"
tokio = "0.2"
tokio-openssl = "0.4"
tokio-postgres = { version = "=0.5.0-alpha.1", path = "../tokio-postgres", default-features = false }
[dev-dependencies]
tokio = "=0.2.0-alpha.6"
tokio = { version = "0.2", features = ["full"] }
postgres = { version = "=0.17.0-alpha.1", path = "../postgres" }

View File

@ -42,8 +42,7 @@
#![doc(html_root_url = "https://docs.rs/postgres-openssl/0.3")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
use futures::task::Context;
use futures::Poll;
use std::task::{Poll, Context};
#[cfg(feature = "runtime")]
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
@ -57,12 +56,14 @@ use std::io;
use std::pin::Pin;
#[cfg(feature = "runtime")]
use std::sync::Arc;
use tokio_io::{AsyncRead, AsyncWrite, Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
use bytes::{Buf, BufMut};
use tokio_openssl::{HandshakeError, SslStream};
use tokio_postgres::tls;
#[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::tls::{ChannelBinding, TlsConnect};
use std::mem::MaybeUninit;
#[cfg(test)]
mod test;
@ -156,7 +157,7 @@ impl<S> AsyncRead for TlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}

View File

@ -11,7 +11,7 @@ readme = "../README.md"
[dependencies]
base64 = "0.10"
byteorder = "1.0"
bytes = "0.4"
bytes = "0.5"
fallible-iterator = "0.2"
generic-array = "0.13"
hmac = "0.7"

View File

@ -31,37 +31,13 @@ pub enum IsNull {
No,
}
// https://github.com/tokio-rs/bytes/issues/170
struct B<'a>(&'a mut BytesMut);
impl<'a> BufMut for B<'a> {
#[inline]
fn remaining_mut(&self) -> usize {
usize::max_value() - self.0.len()
}
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
self.0.advance_mut(cnt);
}
#[inline]
unsafe fn bytes_mut(&mut self) -> &mut [u8] {
if !self.0.has_remaining_mut() {
self.0.reserve(64);
}
self.0.bytes_mut()
}
}
fn write_nullable<F, E>(serializer: F, buf: &mut BytesMut) -> Result<(), E>
where
F: FnOnce(&mut BytesMut) -> Result<IsNull, E>,
E: From<io::Error>,
{
let base = buf.len();
B(buf).put_i32_be(0);
buf.put_i32(0);
let size = match serializer(buf)? {
IsNull::No => i32::from_usize(buf.len() - base - 4)?,
IsNull::Yes => -1,

View File

@ -301,7 +301,7 @@ impl Buffer {
Some(pos) => {
let start = self.idx;
let end = start + pos;
let cstr = self.bytes.slice(start, end);
let cstr = self.bytes.slice(start..end);
self.idx = end + 1;
Ok(cstr)
}
@ -314,7 +314,7 @@ impl Buffer {
#[inline]
fn read_all(&mut self) -> Bytes {
let buf = self.bytes.slice_from(self.idx);
let buf = self.bytes.slice(self.idx..);
self.idx = self.bytes.len();
buf
}

View File

@ -2,13 +2,13 @@
#![allow(missing_docs)]
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use bytes::{Buf, BufMut, BytesMut};
use std::convert::TryFrom;
use std::error::Error;
use std::io;
use std::marker;
use crate::{write_nullable, FromUsize, IsNull, Oid, B};
use crate::{write_nullable, FromUsize, IsNull, Oid};
#[inline]
fn write_body<F, E>(buf: &mut BytesMut, f: F) -> Result<(), E>
@ -61,7 +61,7 @@ where
F: FnMut(T, &mut BytesMut) -> Result<IsNull, Box<dyn Error + marker::Sync + Send>>,
K: IntoIterator<Item = i16>,
{
B(buf).put_u8(b'B');
buf.put_u8(b'B');
write_body(buf, |buf| {
write_cstr(portal.as_bytes(), buf)?;
@ -69,7 +69,7 @@ where
write_counted(
formats,
|f, buf| {
B(buf).put_i16_be(f);
buf.put_i16(f);
Ok::<_, io::Error>(())
},
buf,
@ -82,7 +82,7 @@ where
write_counted(
result_formats,
|f, buf| {
B(buf).put_i16_be(f);
buf.put_i16(f);
Ok::<_, io::Error>(())
},
buf,
@ -115,9 +115,9 @@ where
#[inline]
pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
write_body(buf, |buf| {
B(buf).put_i32_be(80_877_102);
B(buf).put_i32_be(process_id);
B(buf).put_i32_be(secret_key);
buf.put_i32(80_877_102);
buf.put_i32(process_id);
buf.put_i32(secret_key);
Ok::<_, io::Error>(())
})
.unwrap();
@ -125,9 +125,9 @@ pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
#[inline]
pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'C');
buf.put_u8(b'C');
write_body(buf, |buf| {
B(buf).put_u8(variant);
buf.put_u8(variant);
write_cstr(name.as_bytes(), buf)
})
}
@ -141,12 +141,7 @@ impl<T> CopyData<T>
where
T: Buf,
{
pub fn new<U>(buf: U) -> io::Result<CopyData<T>>
where
U: IntoBuf<Buf = T>,
{
let buf = buf.into_buf();
pub fn new(buf: T) -> io::Result<CopyData<T>> {
let len = buf
.remaining()
.checked_add(4)
@ -159,39 +154,39 @@ where
}
pub fn write(self, out: &mut BytesMut) {
B(out).put_u8(b'd');
B(out).put_i32_be(self.len);
B(out).put(self.buf);
out.put_u8(b'd');
out.put_i32(self.len);
out.put(self.buf);
}
}
#[inline]
pub fn copy_done(buf: &mut BytesMut) {
B(buf).put_u8(b'c');
buf.put_u8(b'c');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
#[inline]
pub fn copy_fail(message: &str, buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'f');
buf.put_u8(b'f');
write_body(buf, |buf| write_cstr(message.as_bytes(), buf))
}
#[inline]
pub fn describe(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'D');
buf.put_u8(b'D');
write_body(buf, |buf| {
B(buf).put_u8(variant);
buf.put_u8(variant);
write_cstr(name.as_bytes(), buf)
})
}
#[inline]
pub fn execute(portal: &str, max_rows: i32, buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'E');
buf.put_u8(b'E');
write_body(buf, |buf| {
write_cstr(portal.as_bytes(), buf)?;
B(buf).put_i32_be(max_rows);
buf.put_i32(max_rows);
Ok(())
})
}
@ -201,14 +196,14 @@ pub fn parse<I>(name: &str, query: &str, param_types: I, buf: &mut BytesMut) ->
where
I: IntoIterator<Item = Oid>,
{
B(buf).put_u8(b'P');
buf.put_u8(b'P');
write_body(buf, |buf| {
write_cstr(name.as_bytes(), buf)?;
write_cstr(query.as_bytes(), buf)?;
write_counted(
param_types,
|t, buf| {
B(buf).put_u32_be(t);
buf.put_u32(t);
Ok::<_, io::Error>(())
},
buf,
@ -219,33 +214,33 @@ where
#[inline]
pub fn password_message(password: &[u8], buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'p');
buf.put_u8(b'p');
write_body(buf, |buf| write_cstr(password, buf))
}
#[inline]
pub fn query(query: &str, buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'Q');
buf.put_u8(b'Q');
write_body(buf, |buf| write_cstr(query.as_bytes(), buf))
}
#[inline]
pub fn sasl_initial_response(mechanism: &str, data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'p');
buf.put_u8(b'p');
write_body(buf, |buf| {
write_cstr(mechanism.as_bytes(), buf)?;
let len = i32::from_usize(data.len())?;
B(buf).put_i32_be(len);
B(buf).put_slice(data);
buf.put_i32(len);
buf.put_slice(data);
Ok(())
})
}
#[inline]
pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
B(buf).put_u8(b'p');
buf.put_u8(b'p');
write_body(buf, |buf| {
B(buf).put_slice(data);
buf.put_slice(data);
Ok(())
})
}
@ -253,7 +248,7 @@ pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
#[inline]
pub fn ssl_request(buf: &mut BytesMut) {
write_body(buf, |buf| {
B(buf).put_i32_be(80_877_103);
buf.put_i32(80_877_103);
Ok::<_, io::Error>(())
})
.unwrap();
@ -265,25 +260,25 @@ where
I: IntoIterator<Item = (&'a str, &'a str)>,
{
write_body(buf, |buf| {
B(buf).put_i32_be(196_608);
buf.put_i32(196_608);
for (key, value) in parameters {
write_cstr(key.as_bytes(), buf)?;
write_cstr(value.as_bytes(), buf)?;
}
B(buf).put_u8(0);
buf.put_u8(0);
Ok(())
})
}
#[inline]
pub fn sync(buf: &mut BytesMut) {
B(buf).put_u8(b'S');
buf.put_u8(b'S');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
#[inline]
pub fn terminate(buf: &mut BytesMut) {
B(buf).put_u8(b'X');
buf.put_u8(b'X');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
@ -295,7 +290,7 @@ fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
"string contains embedded null",
));
}
B(buf).put_slice(s);
B(buf).put_u8(0);
buf.put_slice(s);
buf.put_u8(0);
Ok(())
}

View File

@ -8,7 +8,7 @@ use std::io::Read;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str;
use crate::{write_nullable, FromUsize, IsNull, Oid, B};
use crate::{write_nullable, FromUsize, IsNull, Oid};
#[cfg(test)]
mod test;
@ -25,7 +25,7 @@ const PGSQL_AF_INET6: u8 = 3;
/// Serializes a `BOOL` value.
#[inline]
pub fn bool_to_sql(v: bool, buf: &mut BytesMut) {
B(buf).put_u8(v as u8);
buf.put_u8(v as u8);
}
/// Deserializes a `BOOL` value.
@ -41,7 +41,7 @@ pub fn bool_from_sql(buf: &[u8]) -> Result<bool, StdBox<dyn Error + Sync + Send>
/// Serializes a `BYTEA` value.
#[inline]
pub fn bytea_to_sql(v: &[u8], buf: &mut BytesMut) {
B(buf).put_slice(v);
buf.put_slice(v);
}
/// Deserializes a `BYTEA value.
@ -53,7 +53,7 @@ pub fn bytea_from_sql(buf: &[u8]) -> &[u8] {
/// Serializes a `TEXT`, `VARCHAR`, `CHAR(n)`, `NAME`, or `CITEXT` value.
#[inline]
pub fn text_to_sql(v: &str, buf: &mut BytesMut) {
B(buf).put_slice(v.as_bytes());
buf.put_slice(v.as_bytes());
}
/// Deserializes a `TEXT`, `VARCHAR`, `CHAR(n)`, `NAME`, or `CITEXT` value.
@ -65,7 +65,7 @@ pub fn text_from_sql(buf: &[u8]) -> Result<&str, StdBox<dyn Error + Sync + Send>
/// Serializes a `"char"` value.
#[inline]
pub fn char_to_sql(v: i8, buf: &mut BytesMut) {
B(buf).put_i8(v);
buf.put_i8(v);
}
/// Deserializes a `"char"` value.
@ -81,7 +81,7 @@ pub fn char_from_sql(mut buf: &[u8]) -> Result<i8, StdBox<dyn Error + Sync + Sen
/// Serializes an `INT2` value.
#[inline]
pub fn int2_to_sql(v: i16, buf: &mut BytesMut) {
B(buf).put_i16_be(v);
buf.put_i16(v);
}
/// Deserializes an `INT2` value.
@ -97,7 +97,7 @@ pub fn int2_from_sql(mut buf: &[u8]) -> Result<i16, StdBox<dyn Error + Sync + Se
/// Serializes an `INT4` value.
#[inline]
pub fn int4_to_sql(v: i32, buf: &mut BytesMut) {
B(buf).put_i32_be(v);
buf.put_i32(v);
}
/// Deserializes an `INT4` value.
@ -113,7 +113,7 @@ pub fn int4_from_sql(mut buf: &[u8]) -> Result<i32, StdBox<dyn Error + Sync + Se
/// Serializes an `OID` value.
#[inline]
pub fn oid_to_sql(v: Oid, buf: &mut BytesMut) {
B(buf).put_u32_be(v);
buf.put_u32(v);
}
/// Deserializes an `OID` value.
@ -129,7 +129,7 @@ pub fn oid_from_sql(mut buf: &[u8]) -> Result<Oid, StdBox<dyn Error + Sync + Sen
/// Serializes an `INT8` value.
#[inline]
pub fn int8_to_sql(v: i64, buf: &mut BytesMut) {
B(buf).put_i64_be(v);
buf.put_i64(v);
}
/// Deserializes an `INT8` value.
@ -145,7 +145,7 @@ pub fn int8_from_sql(mut buf: &[u8]) -> Result<i64, StdBox<dyn Error + Sync + Se
/// Serializes a `FLOAT4` value.
#[inline]
pub fn float4_to_sql(v: f32, buf: &mut BytesMut) {
B(buf).put_f32_be(v);
buf.put_f32(v);
}
/// Deserializes a `FLOAT4` value.
@ -161,7 +161,7 @@ pub fn float4_from_sql(mut buf: &[u8]) -> Result<f32, StdBox<dyn Error + Sync +
/// Serializes a `FLOAT8` value.
#[inline]
pub fn float8_to_sql(v: f64, buf: &mut BytesMut) {
B(buf).put_f64_be(v);
buf.put_f64(v);
}
/// Deserializes a `FLOAT8` value.
@ -184,7 +184,7 @@ where
I: IntoIterator<Item = (&'a str, Option<&'a str>)>,
{
let base = buf.len();
B(buf).put_i32_be(0);
buf.put_i32(0);
let mut count = 0;
for (key, value) in values {
@ -196,7 +196,7 @@ where
Some(value) => {
write_pascal_string(value, buf)?;
}
None => B(buf).put_i32_be(-1),
None => buf.put_i32(-1),
}
}
@ -208,8 +208,8 @@ where
fn write_pascal_string(s: &str, buf: &mut BytesMut) -> Result<(), StdBox<dyn Error + Sync + Send>> {
let size = i32::from_usize(s.len())?;
B(buf).put_i32_be(size);
B(buf).put_slice(s.as_bytes());
buf.put_i32(size);
buf.put_slice(s.as_bytes());
Ok(())
}
@ -292,10 +292,10 @@ where
I: Iterator<Item = u8>,
{
let len = i32::from_usize(len)?;
B(buf).put_i32_be(len);
buf.put_i32(len);
for byte in v {
B(buf).put_u8(byte);
buf.put_u8(byte);
}
Ok(())
@ -352,7 +352,7 @@ impl<'a> Varbit<'a> {
/// The value should represent the number of microseconds since midnight, January 1st, 2000.
#[inline]
pub fn timestamp_to_sql(v: i64, buf: &mut BytesMut) {
B(buf).put_i64_be(v);
buf.put_i64(v);
}
/// Deserializes a `TIMESTAMP` or `TIMESTAMPTZ` value.
@ -372,7 +372,7 @@ pub fn timestamp_from_sql(mut buf: &[u8]) -> Result<i64, StdBox<dyn Error + Sync
/// The value should represent the number of days since January 1st, 2000.
#[inline]
pub fn date_to_sql(v: i32, buf: &mut BytesMut) {
B(buf).put_i32_be(v);
buf.put_i32(v);
}
/// Deserializes a `DATE` value.
@ -392,7 +392,7 @@ pub fn date_from_sql(mut buf: &[u8]) -> Result<i32, StdBox<dyn Error + Sync + Se
/// The value should represent the number of microseconds since midnight.
#[inline]
pub fn time_to_sql(v: i64, buf: &mut BytesMut) {
B(buf).put_i64_be(v);
buf.put_i64(v);
}
/// Deserializes a `TIME` or `TIMETZ` value.
@ -410,7 +410,7 @@ pub fn time_from_sql(mut buf: &[u8]) -> Result<i64, StdBox<dyn Error + Sync + Se
/// Serializes a `MACADDR` value.
#[inline]
pub fn macaddr_to_sql(v: [u8; 6], buf: &mut BytesMut) {
B(buf).put_slice(&v);
buf.put_slice(&v);
}
/// Deserializes a `MACADDR` value.
@ -427,7 +427,7 @@ pub fn macaddr_from_sql(buf: &[u8]) -> Result<[u8; 6], StdBox<dyn Error + Sync +
/// Serializes a `UUID` value.
#[inline]
pub fn uuid_to_sql(v: [u8; 16], buf: &mut BytesMut) {
B(buf).put_slice(&v);
buf.put_slice(&v);
}
/// Deserializes a `UUID` value.
@ -456,16 +456,16 @@ where
F: FnMut(T, &mut BytesMut) -> Result<IsNull, StdBox<dyn Error + Sync + Send>>,
{
let dimensions_idx = buf.len();
B(buf).put_i32_be(0);
buf.put_i32(0);
let flags_idx = buf.len();
B(buf).put_i32_be(0);
B(buf).put_u32_be(element_type);
buf.put_i32(0);
buf.put_u32(element_type);
let mut num_dimensions = 0;
for dimension in dimensions {
num_dimensions += 1;
B(buf).put_i32_be(dimension.len);
B(buf).put_i32_be(dimension.lower_bound);
buf.put_i32(dimension.len);
buf.put_i32(dimension.lower_bound);
}
let num_dimensions = i32::from_usize(num_dimensions)?;
@ -646,7 +646,7 @@ impl<'a> FallibleIterator for ArrayValues<'a> {
/// Serializes an empty range.
#[inline]
pub fn empty_range_to_sql(buf: &mut BytesMut) {
B(buf).put_u8(RANGE_EMPTY);
buf.put_u8(RANGE_EMPTY);
}
/// Serializes a range value.
@ -660,7 +660,7 @@ where
G: FnOnce(&mut BytesMut) -> Result<RangeBound<IsNull>, StdBox<dyn Error + Sync + Send>>,
{
let tag_idx = buf.len();
B(buf).put_u8(0);
buf.put_u8(0);
let mut tag = 0;
match write_bound(lower, buf)? {
@ -688,7 +688,7 @@ where
F: FnOnce(&mut BytesMut) -> Result<RangeBound<IsNull>, StdBox<dyn Error + Sync + Send>>,
{
let base = buf.len();
B(buf).put_i32_be(0);
buf.put_i32(0);
let (null, ret) = match bound(buf)? {
RangeBound::Inclusive(null) => (Some(null), RangeBound::Inclusive(())),
@ -784,8 +784,8 @@ pub enum Range<'a> {
/// Serializes a point value.
#[inline]
pub fn point_to_sql(x: f64, y: f64, buf: &mut BytesMut) {
B(buf).put_f64_be(x);
B(buf).put_f64_be(y);
buf.put_f64(x);
buf.put_f64(y);
}
/// Deserializes a point value.
@ -823,10 +823,10 @@ impl Point {
/// Serializes a box value.
#[inline]
pub fn box_to_sql(x1: f64, y1: f64, x2: f64, y2: f64, buf: &mut BytesMut) {
B(buf).put_f64_be(x1);
B(buf).put_f64_be(y1);
B(buf).put_f64_be(x2);
B(buf).put_f64_be(y2);
buf.put_f64(x1);
buf.put_f64(y1);
buf.put_f64(x2);
buf.put_f64(y2);
}
/// Deserializes a box value.
@ -876,15 +876,15 @@ pub fn path_to_sql<I>(
where
I: IntoIterator<Item = (f64, f64)>,
{
B(buf).put_u8(closed as u8);
buf.put_u8(closed as u8);
let points_idx = buf.len();
B(buf).put_i32_be(0);
buf.put_i32(0);
let mut num_points = 0;
for (x, y) in points {
num_points += 1;
B(buf).put_f64_be(x);
B(buf).put_f64_be(y);
buf.put_f64(x);
buf.put_f64(y);
}
let num_points = i32::from_usize(num_points)?;
@ -970,17 +970,17 @@ pub fn inet_to_sql(addr: IpAddr, netmask: u8, buf: &mut BytesMut) {
IpAddr::V4(_) => PGSQL_AF_INET,
IpAddr::V6(_) => PGSQL_AF_INET6,
};
B(buf).put_u8(family);
B(buf).put_u8(netmask);
B(buf).put_u8(0); // is_cidr
buf.put_u8(family);
buf.put_u8(netmask);
buf.put_u8(0); // is_cidr
match addr {
IpAddr::V4(addr) => {
B(buf).put_u8(4);
B(buf).put_slice(&addr.octets());
buf.put_u8(4);
buf.put_slice(&addr.octets());
}
IpAddr::V6(addr) => {
B(buf).put_u8(16);
B(buf).put_slice(&addr.octets());
buf.put_u8(16);
buf.put_slice(&addr.octets());
}
}
}

View File

@ -21,7 +21,7 @@ with-uuid-0_7 = ["uuid-07"]
with-uuid-0_8 = ["uuid-08"]
[dependencies]
bytes = "0.4"
bytes = "0.5"
fallible-iterator = "0.2"
postgres-protocol = { version = "=0.5.0-alpha.1", path = "../postgres-protocol" }
postgres-derive = { version = "=0.4.0-alpha.1", optional = true, path = "../postgres-derive" }

View File

@ -132,7 +132,7 @@ use crate::type_gen::{Inner, Other};
pub use postgres_protocol::Oid;
pub use crate::special::{Date, Timestamp};
use bytes::{BufMut, BytesMut};
use bytes::BytesMut;
// Number of seconds from 1970-01-01 to 2000-01-01
const TIME_SEC_CONVERSION: u64 = 946_684_800;
@ -208,30 +208,6 @@ pub mod private;
mod special;
mod type_gen;
// https://github.com/tokio-rs/bytes/issues/170
struct B<'a>(&'a mut BytesMut);
impl<'a> BufMut for B<'a> {
#[inline]
fn remaining_mut(&self) -> usize {
usize::max_value() - self.0.len()
}
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
self.0.advance_mut(cnt);
}
#[inline]
unsafe fn bytes_mut(&mut self) -> &mut [u8] {
if !self.0.has_remaining_mut() {
self.0.reserve(64);
}
self.0.bytes_mut()
}
}
/// A Postgres type.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Type(Inner);

View File

@ -29,16 +29,16 @@ with-uuid-0_7 = ["tokio-postgres/with-uuid-0_7"]
with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
[dependencies]
bytes = "0.4"
bytes = "0.5"
fallible-iterator = "0.2"
futures-preview = "=0.3.0-alpha.19"
futures = "0.3"
pin-utils = "=0.1.0-alpha.4"
tokio-postgres = { version = "=0.5.0-alpha.1", path = "../tokio-postgres", default-features = false }
tokio-executor = "=0.2.0-alpha.6"
tokio = { version = "=0.2.0-alpha.6", optional = true }
tokio = { version = "0.2", optional = true, features = ["rt-threaded"] }
lazy_static = { version = "1.0", optional = true }
log = { version = "0.4", optional = true }
[dev-dependencies]
tokio = "=0.2.0-alpha.6"
tokio = "0.2"

View File

@ -299,7 +299,7 @@ impl Client {
/// use postgres::{Client, NoTls};
/// use std::io::Read;
///
/// # fn main() -> Result<(), Box<std::error::Error>> {
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let mut reader = client.copy_out("COPY people TO stdout", &[])?;

View File

@ -2,7 +2,7 @@
//!
//! Requires the `runtime` Cargo feature (enabled by default).
use futures::FutureExt;
use futures::{FutureExt, executor};
use log::error;
use std::fmt;
use std::path::Path;
@ -274,7 +274,9 @@ impl Config {
}
None => {
let connect = self.config.connect(tls);
RUNTIME.block_on(connect)?
RUNTIME.handle().enter(|| {
executor::block_on(connect)
})?
}
};

View File

@ -1,6 +1,5 @@
use futures::Stream;
use std::io;
use std::io::Read;
use std::io::{self, Cursor, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
@ -10,16 +9,16 @@ impl<R> Stream for CopyInStream<R>
where
R: Read + Unpin,
{
type Item = io::Result<Vec<u8>>;
type Item = io::Result<Cursor<Vec<u8>>>;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<io::Result<Vec<u8>>>> {
) -> Poll<Option<io::Result<Cursor<Vec<u8>>>>> {
let mut buf = vec![];
match self.0.by_ref().take(4096).read_to_end(&mut buf)? {
0 => Poll::Ready(None),
_ => Poll::Ready(Some(Ok(buf))),
_ => Poll::Ready(Some(Ok(Cursor::new(buf)))),
}
}
}

View File

@ -92,7 +92,9 @@ mod test;
#[cfg(feature = "runtime")]
lazy_static! {
static ref RUNTIME: Runtime = runtime::Builder::new()
.name_prefix("postgres-")
.thread_name("postgres")
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
}

View File

@ -1,5 +1,4 @@
use std::io::Read;
use tokio::runtime::Runtime;
use tokio_postgres::types::Type;
use tokio_postgres::NoTls;
@ -223,21 +222,3 @@ fn portal() {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get::<_, i32>(0), 3);
}
#[test]
fn custom_executor() {
let runtime = Runtime::new().unwrap();
let mut config = "host=localhost port=5433 user=postgres"
.parse::<crate::Config>()
.unwrap();
config.executor(runtime.executor());
let mut client = config.connect(NoTls).unwrap();
let rows = client.query("SELECT $1::TEXT", &[&"hello"]).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get::<_, &str>(0), "hello");
drop(runtime);
assert!(client.is_closed());
}

View File

@ -5,12 +5,13 @@ authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018"
[dependencies]
bytes = "0.4"
futures-preview = "=0.3.0-alpha.19"
parking_lot = "0.9"
byteorder = "1.0"
bytes = "0.5"
futures = "0.3"
parking_lot = "0.10"
pin-project-lite = "0.1"
tokio-postgres = { version = "=0.5.0-alpha.1", default-features = false, path = "../tokio-postgres" }
[dev-dependencies]
tokio = "=0.2.0-alpha.6"
tokio = { version = "0.2", features = ["full"] }
tokio-postgres = { version = "=0.5.0-alpha.1", path = "../tokio-postgres" }

View File

@ -1,4 +1,4 @@
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut, Buf};
use bytes::{BufMut, Bytes, BytesMut, Buf};
use futures::{future, ready, Stream};
use parking_lot::Mutex;
use pin_project_lite::pin_project;
@ -12,6 +12,7 @@ use std::task::{Context, Poll};
use tokio_postgres::types::{IsNull, ToSql, Type, FromSql, WrongType};
use tokio_postgres::CopyStream;
use std::io::Cursor;
use byteorder::{ByteOrder, BigEndian};
#[cfg(test)]
mod test;
@ -40,8 +41,8 @@ where
let mut buf = BytesMut::new();
buf.reserve(HEADER_LEN);
buf.put_slice(MAGIC); // magic
buf.put_i32_be(0); // flags
buf.put_i32_be(0); // header extension
buf.put_i32(0); // flags
buf.put_i32(0); // header extension
let buf = Arc::new(Mutex::new(buf));
let writer = BinaryCopyInWriter {
@ -75,10 +76,10 @@ where
let mut buf = this.buf.lock();
if *this.done {
buf.reserve(2);
buf.put_i16_be(-1);
Poll::Ready(Some(Ok(buf.take().freeze())))
buf.put_i16(-1);
Poll::Ready(Some(Ok(buf.split().freeze())))
} else if buf.len() > BLOCK_SIZE {
Poll::Ready(Some(Ok(buf.take().freeze())))
Poll::Ready(Some(Ok(buf.split().freeze())))
} else {
Poll::Pending
}
@ -124,12 +125,12 @@ impl BinaryCopyInWriter {
let mut buf = self.buf.lock();
buf.reserve(2);
buf.put_u16_be(self.types.len() as u16);
buf.put_u16(self.types.len() as u16);
for (value, type_) in values.zip(&self.types) {
let idx = buf.len();
buf.reserve(4);
buf.put_i32_be(0);
buf.put_i32(0);
let len = match value.to_sql_checked(type_, &mut buf)? {
IsNull::Yes => -1,
IsNull::No => i32::try_from(buf.len() - idx - 4)?,
@ -186,10 +187,10 @@ impl Stream for BinaryCopyOutStream {
}
chunk.advance(MAGIC.len());
let flags = chunk.get_i32_be();
let flags = chunk.get_i32();
let has_oids = (flags & (1 << 16)) != 0;
let header_extension = chunk.get_u32_be() as usize;
let header_extension = chunk.get_u32() as usize;
check_remaining(&chunk, header_extension)?;
chunk.advance(header_extension);
@ -199,7 +200,7 @@ impl Stream for BinaryCopyOutStream {
};
check_remaining(&chunk, 2)?;
let mut len = chunk.get_i16_be();
let mut len = chunk.get_i16();
if len == -1 {
return Poll::Ready(None);
}
@ -214,7 +215,7 @@ impl Stream for BinaryCopyOutStream {
let mut ranges = vec![];
for _ in 0..len {
check_remaining(&chunk, 4)?;
let len = chunk.get_i32_be();
let len = chunk.get_i32();
if len == -1 {
ranges.push(None);
} else {

View File

@ -25,7 +25,7 @@ circle-ci = { repository = "sfackler/rust-postgres" }
[features]
default = ["runtime"]
runtime = ["tokio/rt-full", "tokio/tcp", "tokio/uds"]
runtime = ["tokio/dns", "tokio/net", "tokio/time"]
with-bit-vec-0_6 = ["postgres-types/with-bit-vec-0_6"]
with-chrono-0_4 = ["postgres-types/with-chrono-0_4"]
@ -36,9 +36,9 @@ with-uuid-0_7 = ["postgres-types/with-uuid-0_7"]
with-uuid-0_8 = ["postgres-types/with-uuid-0_8"]
[dependencies]
bytes = "0.4"
bytes = "0.5"
fallible-iterator = "0.2"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
futures = "0.3"
log = "0.4"
parking_lot = "0.9"
percent-encoding = "2.0"
@ -46,10 +46,11 @@ pin-project-lite = "0.1"
phf = "0.8"
postgres-protocol = { version = "=0.5.0-alpha.1", path = "../postgres-protocol" }
postgres-types = { version = "=0.1.0-alpha.1", path = "../postgres-types" }
tokio = { version = "=0.2.0-alpha.6", default-features = false, features = ["io", "codec"] }
tokio = { version = "0.2", features = ["io-util"] }
tokio-util = { version = "0.2", features = ["codec"] }
[dev-dependencies]
tokio = "=0.2.0-alpha.6"
tokio = { version = "0.2", features = ["full"] }
env_logger = "0.7"
criterion = "0.3"

View File

@ -23,7 +23,7 @@ where
let buf = client.with_buf(|buf| {
query::encode_bind(&statement, params, &name, buf)?;
frontend::sync(buf);
Ok(buf.take().freeze())
Ok(buf.split().freeze())
})?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

View File

@ -18,7 +18,7 @@ use crate::{cancel_query_raw, copy_in, copy_out, query, Transaction};
use crate::{prepare, SimpleQueryMessage};
use crate::{simple_query, Row};
use crate::{Error, Statement};
use bytes::{BytesMut, IntoBuf};
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures::channel::mpsc;
use futures::{future, pin_mut, ready, StreamExt, TryStream, TryStreamExt};
@ -357,8 +357,7 @@ impl Client {
where
T: ?Sized + ToStatement,
S: TryStream,
S::Ok: IntoBuf,
<S::Ok as IntoBuf>::Buf: 'static + Send,
S::Ok: Buf + 'static + Send,
S::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
let statement = statement.__convert().into_statement(self).await?;

View File

@ -3,7 +3,7 @@ use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend;
use postgres_protocol::message::frontend::CopyData;
use std::io;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {
Raw(Bytes),

View File

@ -17,8 +17,8 @@ use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,

View File

@ -6,7 +6,7 @@ use std::time::Duration;
use tokio::net::TcpStream;
#[cfg(unix)]
use tokio::net::UnixStream;
use tokio::timer::Timeout;
use tokio::time;
pub(crate) async fn connect_socket(
host: &Host,
@ -42,7 +42,7 @@ where
F: Future<Output = io::Result<T>>,
{
match timeout {
Some(timeout) => match Timeout::new(connect, timeout).await {
Some(timeout) => match time::timeout(timeout, connect).await {
Ok(Ok(socket)) => Ok(socket),
Ok(Err(e)) => Err(Error::connect(e)),
Err(_) => Err(Error::connect(io::Error::new(

View File

@ -15,8 +15,8 @@ use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
pub enum RequestMessages {
Single(FrontendMessage),

View File

@ -3,7 +3,8 @@ use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::ToSql;
use crate::{query, Error, Statement};
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use bytes::buf::BufExt;
use bytes::{Buf, BufMut, BytesMut};
use futures::channel::mpsc;
use futures::{pin_mut, ready, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
use postgres_protocol::message::backend::Message;
@ -70,8 +71,7 @@ where
I: IntoIterator<Item = &'a dyn ToSql>,
I::IntoIter: ExactSizeIterator,
S: TryStream,
S::Ok: IntoBuf,
<S::Ok as IntoBuf>::Buf: 'static + Send,
S::Ok: Buf + 'static + Send,
S::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
let buf = query::encode(client, &statement, params)?;
@ -100,19 +100,17 @@ where
pin_mut!(stream);
while let Some(buf) = stream.try_next().await.map_err(Error::copy_in_stream)? {
let buf = buf.into_buf();
let data: Box<dyn Buf + Send> = if buf.remaining() > 4096 {
if bytes.is_empty() {
Box::new(buf)
} else {
Box::new(bytes.take().freeze().into_buf().chain(buf))
Box::new(bytes.split().freeze().chain(buf))
}
} else {
bytes.reserve(buf.remaining());
bytes.put(buf);
if bytes.len() > 4096 {
Box::new(bytes.take().freeze().into_buf())
Box::new(bytes.split().freeze())
} else {
continue;
}
@ -126,7 +124,7 @@ where
}
if !bytes.is_empty() {
let data: Box<dyn Buf + Send> = Box::new(bytes.freeze().into_buf());
let data: Box<dyn Buf + Send> = Box::new(bytes.freeze());
let data = CopyData::new(data).map_err(Error::encode)?;
sender
.send(CopyInMessage::Message(FrontendMessage::CopyData(data)))

View File

@ -1,15 +1,15 @@
use crate::client::{InnerClient, Responses};
use pin_project_lite::pin_project;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::ToSql;
use crate::{query, Error, Statement};
use bytes::Bytes;
use futures::{ready, Stream};
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::marker::PhantomPinned;
pub async fn copy_out<'a, I>(
client: &InnerClient,
@ -22,7 +22,10 @@ where
{
let buf = query::encode(client, &statement, params)?;
let responses = start(client, buf).await?;
Ok(CopyStream { responses, _p: PhantomPinned })
Ok(CopyStream {
responses,
_p: PhantomPinned,
})
}
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {

View File

@ -105,6 +105,7 @@
pub use crate::client::Client;
pub use crate::config::Config;
pub use crate::connection::Connection;
pub use crate::copy_out::CopyStream;
use crate::error::DbError;
pub use crate::error::Error;
pub use crate::portal::Portal;
@ -114,7 +115,6 @@ pub use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")]
pub use crate::socket::Socket;
pub use crate::statement::{Column, Statement};
pub use crate::copy_out::CopyStream;
#[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect;
pub use crate::tls::NoTls;

View File

@ -1,6 +1,7 @@
use crate::tls::{ChannelBinding, TlsStream};
use bytes::{Buf, BufMut};
use std::io;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
@ -15,7 +16,7 @@ where
S: AsyncRead + Unpin,
T: AsyncRead + Unpin,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
match self {
MaybeTlsStream::Raw(s) => s.prepare_uninitialized_buffer(buf),
MaybeTlsStream::Tls(s) => s.prepare_uninitialized_buffer(buf),

View File

@ -17,7 +17,7 @@ impl Drop for Inner {
let buf = client.with_buf(|buf| {
frontend::close(b'P', &self.name, buf).unwrap();
frontend::sync(buf);
buf.take().freeze()
buf.split().freeze()
});
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}

View File

@ -115,7 +115,7 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu
frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
frontend::describe(b'S', &name, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.take().freeze())
Ok(buf.split().freeze())
})
}

View File

@ -38,7 +38,7 @@ pub async fn query_portal(
let buf = client.with_buf(|buf| {
frontend::execute(portal.name(), max_rows, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.take().freeze())
Ok(buf.split().freeze())
})?;
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
@ -102,7 +102,7 @@ where
encode_bind(statement, params, "", buf)?;
frontend::execute("", 0, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.take().freeze())
Ok(buf.split().freeze())
})
}

View File

@ -43,7 +43,7 @@ pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Erro
fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.take().freeze())
Ok(buf.split().freeze())
})
}

View File

@ -1,5 +1,6 @@
use bytes::{Buf, BufMut};
use std::io;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
@ -32,7 +33,7 @@ impl Socket {
}
impl AsyncRead for Socket {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
match &self.0 {
Inner::Tcp(s) => s.prepare_uninitialized_buffer(buf),
#[cfg(unix)]

View File

@ -18,7 +18,7 @@ impl Drop for StatementInner {
let buf = client.with_buf(|buf| {
frontend::close(b'S', &self.name, buf).unwrap();
frontend::sync(buf);
buf.take().freeze()
buf.split().freeze()
});
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}

View File

@ -11,7 +11,7 @@ use crate::Socket;
use crate::{
bind, query, slice_iter, Client, Error, Portal, Row, SimpleQueryMessage, Statement, ToStatement,
};
use bytes::IntoBuf;
use bytes::Buf;
use futures::{TryStream, TryStreamExt};
use postgres_protocol::message::frontend;
use std::error;
@ -40,7 +40,7 @@ impl<'a> Drop for Transaction<'a> {
};
let buf = self.client.inner().with_buf(|buf| {
frontend::query(&query, buf).unwrap();
buf.take().freeze()
buf.split().freeze()
});
let _ = self
.client
@ -218,8 +218,7 @@ impl<'a> Transaction<'a> {
where
T: ?Sized + ToStatement,
S: TryStream,
S::Ok: IntoBuf,
<S::Ok as IntoBuf>::Buf: 'static + Send,
S::Ok: Buf + 'static + Send,
S::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
self.client.copy_in(statement, params, stream).await

View File

@ -1,12 +1,13 @@
#![warn(rust_2018_idioms)]
use bytes::{Bytes, BytesMut};
use futures::channel::mpsc;
use futures::{future, stream, StreamExt};
use futures::{join, try_join, FutureExt, TryStreamExt};
use std::fmt::Write;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::timer;
use tokio::time;
use tokio_postgres::error::SqlState;
use tokio_postgres::tls::{NoTls, NoTlsStream};
use tokio_postgres::types::{Kind, Type};
@ -302,7 +303,7 @@ async fn cancel_query_raw() {
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
let cancel = client.cancel_query_raw(socket, NoTls);
let cancel = timer::delay(Instant::now() + Duration::from_millis(100)).then(|()| cancel);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)");
@ -410,9 +411,12 @@ async fn copy_in() {
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
let stream = stream::iter(
vec![b"1\tjim\n".to_vec(), b"2\tjoe\n".to_vec()]
.into_iter()
.map(Ok::<_, String>),
vec![
Bytes::from_static(b"1\tjim\n"),
Bytes::from_static(b"2\tjoe\n"),
]
.into_iter()
.map(Ok::<_, String>),
);
let rows = client.copy_in(&stmt, &[], stream).await.unwrap();
assert_eq!(rows, 2);
@ -446,16 +450,20 @@ async fn copy_in_large() {
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
let a = "0\tname0\n".to_string();
let mut b = String::new();
let a = Bytes::from_static(b"0\tname0\n");
let mut b = BytesMut::new();
for i in 1..5_000 {
writeln!(b, "{0}\tname{0}", i).unwrap();
}
let mut c = String::new();
let mut c = BytesMut::new();
for i in 5_000..10_000 {
writeln!(c, "{0}\tname{0}", i).unwrap();
}
let stream = stream::iter(vec![a, b, c].into_iter().map(Ok::<_, String>));
let stream = stream::iter(
vec![a, b.freeze(), c.freeze()]
.into_iter()
.map(Ok::<_, String>),
);
let rows = client.copy_in(&stmt, &[], stream).await.unwrap();
assert_eq!(rows, 10_000);
@ -476,7 +484,7 @@ async fn copy_in_error() {
.unwrap();
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
let stream = stream::iter(vec![Ok(b"1\tjim\n".to_vec()), Err("asdf")]);
let stream = stream::iter(vec![Ok(Bytes::from_static(b"1\tjim\n")), Err("asdf")]);
let error = client.copy_in(&stmt, &[], stream).await.unwrap_err();
assert!(error.to_string().contains("asdf"));
@ -509,7 +517,12 @@ async fn copy_out() {
.copy_out(&stmt, &[])
.await
.unwrap()
.try_concat()
.try_fold(BytesMut::new(), |mut buf, chunk| {
async move {
buf.extend_from_slice(&chunk);
Ok(buf)
}
})
.await
.unwrap();
assert_eq!(&data[..], b"1\tjim\n2\tjoe\n");

View File

@ -1,6 +1,6 @@
use futures::{join, FutureExt};
use std::time::{Duration, Instant};
use tokio::timer;
use std::time::Duration;
use tokio::time;
use tokio_postgres::error::SqlState;
use tokio_postgres::{Client, NoTls};
@ -71,7 +71,7 @@ async fn cancel_query() {
let client = connect("host=localhost port=5433 user=postgres").await;
let cancel = client.cancel_query(NoTls);
let cancel = timer::delay(Instant::now() + Duration::from_millis(100)).then(|()| cancel);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)");