diff --git a/tokio-postgres/Cargo.toml b/tokio-postgres/Cargo.toml index 2699b416..4c9d069f 100644 --- a/tokio-postgres/Cargo.toml +++ b/tokio-postgres/Cargo.toml @@ -22,6 +22,7 @@ with-uuid = ["postgres-shared/with-uuid"] with-openssl = ["tokio-openssl", "openssl"] [dependencies] +bytes = "0.4" fallible-iterator = "0.1.3" futures = "0.1.7" futures-state-stream = "0.1" @@ -29,6 +30,7 @@ postgres-protocol = { version = "0.2", path = "../postgres-protocol" } postgres-shared = { version = "0.2", path = "../postgres-shared" } tokio-core = "0.1" tokio-dns-unofficial = "0.1" +tokio-io = "0.1" tokio-openssl = { version = "0.1", optional = true } openssl = { version = "0.9", optional = true } diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index 5f827721..838d7959 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -54,12 +54,14 @@ #![doc(html_root_url="https://docs.rs/tokio-postgres/0.2.1")] #![warn(missing_docs)] +extern crate bytes; extern crate fallible_iterator; extern crate futures_state_stream; extern crate postgres_shared; extern crate postgres_protocol; extern crate tokio_core; extern crate tokio_dns; +extern crate tokio_io; #[macro_use] extern crate futures; @@ -86,7 +88,7 @@ use std::io; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::sync::mpsc::{self, Sender, Receiver}; -use tokio_core::io::IoFuture; +use tokio_io::IoFuture; use tokio_core::reactor::Handle; #[doc(inline)] diff --git a/tokio-postgres/src/stream.rs b/tokio-postgres/src/stream.rs index d7975c65..c926e9b4 100644 --- a/tokio-postgres/src/stream.rs +++ b/tokio-postgres/src/stream.rs @@ -1,10 +1,12 @@ -use futures::{BoxFuture, Future, IntoFuture, Async, Sink, Stream as FuturesStream}; +use bytes::{BytesMut, BufMut}; +use futures::{BoxFuture, Future, IntoFuture, Sink, Stream as FuturesStream, Poll}; use futures::future::Either; use postgres_shared::params::Host; use postgres_protocol::message::backend::{self, ParseResult}; use postgres_protocol::message::frontend; use std::io::{self, Read, Write}; -use tokio_core::io::{Io, Codec, EasyBuf, Framed}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::{Encoder, Decoder, Framed}; use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; use tokio_dns; @@ -132,65 +134,85 @@ impl Write for Stream { } } -impl Io for Stream { - fn poll_read(&mut self) -> Async<()> { +impl AsyncRead for Stream { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { match self.0 { - InnerStream::Tcp(ref mut s) => s.poll_read(), + InnerStream::Tcp(ref s) => s.prepare_uninitialized_buffer(buf), #[cfg(unix)] - InnerStream::Unix(ref mut s) => s.poll_read(), + InnerStream::Unix(ref s) => s.prepare_uninitialized_buffer(buf), } } - fn poll_write(&mut self) -> Async<()> { + fn read_buf(&mut self, buf: &mut B) -> Poll + where B: BufMut + { match self.0 { - InnerStream::Tcp(ref mut s) => s.poll_write(), + InnerStream::Tcp(ref mut s) => s.read_buf(buf), #[cfg(unix)] - InnerStream::Unix(ref mut s) => s.poll_write(), + InnerStream::Unix(ref mut s) => s.read_buf(buf), + } + } +} + +impl AsyncWrite for Stream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + match self.0 { + InnerStream::Tcp(ref mut s) => s.shutdown(), + #[cfg(unix)] + InnerStream::Unix(ref mut s) => s.shutdown(), } } } pub struct PostgresCodec; -impl Codec for PostgresCodec { - type In = backend::Message>; - type Out = Vec; +impl Decoder for PostgresCodec { + type Item = backend::Message>; + type Error = io::Error; // FIXME ideally we'd avoid re-copying the data - fn decode(&mut self, buf: &mut EasyBuf) -> io::Result> { + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { match backend::Message::parse_owned(buf.as_ref())? { ParseResult::Complete { message, consumed } => { - buf.drain_to(consumed); + buf.split_to(consumed); Ok(Some(message)) } ParseResult::Incomplete { .. } => Ok(None), } } +} - fn encode(&mut self, msg: Vec, buf: &mut Vec) -> io::Result<()> { - buf.extend_from_slice(&msg); +impl Encoder for PostgresCodec { + type Item = Vec; + type Error = io::Error; + + fn encode(&mut self, msg: Vec, buf: &mut BytesMut) -> io::Result<()> { + buf.extend(&msg); Ok(()) } } struct SslCodec; -impl Codec for SslCodec { - type In = u8; - type Out = Vec; +impl Decoder for SslCodec { + type Item = u8; + type Error = io::Error; - fn decode(&mut self, buf: &mut EasyBuf) -> io::Result> { - if buf.as_slice().is_empty() { + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { + if buf.is_empty() { Ok(None) } else { - let byte = buf.as_slice()[0]; - buf.drain_to(1); - Ok(Some(byte)) + Ok(Some(buf.split_to(1)[0])) } } +} - fn encode(&mut self, msg: Vec, buf: &mut Vec) -> io::Result<()> { - buf.extend_from_slice(&msg); +impl Encoder for SslCodec { + type Item = Vec; + type Error = io::Error; + + fn encode(&mut self, msg: Vec, buf: &mut BytesMut) -> io::Result<()> { + buf.extend(&msg); Ok(()) } } diff --git a/tokio-postgres/src/tls/mod.rs b/tokio-postgres/src/tls/mod.rs index 32053ab7..c94d64f6 100644 --- a/tokio-postgres/src/tls/mod.rs +++ b/tokio-postgres/src/tls/mod.rs @@ -2,7 +2,7 @@ use futures::BoxFuture; use std::error::Error; -use tokio_core::io::Io; +use tokio_io::{AsyncRead, AsyncWrite}; pub use stream::Stream; @@ -10,7 +10,7 @@ pub use stream::Stream; pub mod openssl; /// A trait implemented by streams returned from `Handshake` implementations. -pub trait TlsStream: Io + Send { +pub trait TlsStream: AsyncRead + AsyncWrite + Send { /// Returns a shared reference to the inner stream. fn get_ref(&self) -> &Stream; @@ -18,8 +18,6 @@ pub trait TlsStream: Io + Send { fn get_mut(&mut self) -> &mut Stream; } -impl Io for Box {} - impl TlsStream for Stream { fn get_ref(&self) -> &Stream { self