Start on runtime API

This commit is contained in:
Steven Fackler 2018-12-16 21:30:52 -08:00
parent 707b87a18e
commit 7df7fc715b
12 changed files with 127 additions and 18 deletions

View File

@ -37,5 +37,6 @@ jobs:
- run: cargo fmt --all -- --check - run: cargo fmt --all -- --check
- run: cargo clippy --all - run: cargo clippy --all
- run: cargo test --all - run: cargo test --all
- run: cargo test --manifest-path tokio-postgres/Cargo.toml --no-default-features
- run: cargo test --manifest-path tokio-postgres/Cargo.toml --all-features - run: cargo test --manifest-path tokio-postgres/Cargo.toml --all-features
- *SAVE_DEPS - *SAVE_DEPS

View File

@ -9,7 +9,7 @@ futures = "0.1"
native-tls = "0.2" native-tls = "0.2"
tokio-io = "0.1" tokio-io = "0.1"
tokio-tls = "0.2" tokio-tls = "0.2"
tokio-postgres = { version = "0.3", path = "../tokio-postgres" } tokio-postgres = { version = "0.3", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = "0.1.7" tokio = "0.1.7"

View File

@ -15,7 +15,7 @@ where
let handshake = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) let handshake = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap())
.map_err(|e| panic!("{}", e)) .map_err(|e| panic!("{}", e))
.and_then(|s| builder.connect(s, tls)); .and_then(|s| builder.handshake(s, tls));
let (mut client, connection) = runtime.block_on(handshake).unwrap(); let (mut client, connection) = runtime.block_on(handshake).unwrap();
let connection = connection.map_err(|e| panic!("{}", e)); let connection = connection.map_err(|e| panic!("{}", e));
runtime.spawn(connection); runtime.spawn(connection);

View File

@ -9,7 +9,7 @@ futures = "0.1"
openssl = "0.10" openssl = "0.10"
tokio-io = "0.1" tokio-io = "0.1"
tokio-openssl = "0.3" tokio-openssl = "0.3"
tokio-postgres = { version = "0.3", path = "../tokio-postgres" } tokio-postgres = { version = "0.3", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = "0.1.7" tokio = "0.1.7"

View File

@ -15,7 +15,7 @@ where
let handshake = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) let handshake = TcpStream::connect(&"127.0.0.1:5433".parse().unwrap())
.map_err(|e| panic!("{}", e)) .map_err(|e| panic!("{}", e))
.and_then(|s| builder.connect(s, tls)); .and_then(|s| builder.handshake(s, tls));
let (mut client, connection) = runtime.block_on(handshake).unwrap(); let (mut client, connection) = runtime.block_on(handshake).unwrap();
let connection = connection.map_err(|e| panic!("{}", e)); let connection = connection.map_err(|e| panic!("{}", e));
runtime.spawn(connection); runtime.spawn(connection);

View File

@ -27,6 +27,9 @@ features = [
circle-ci = { repository = "sfackler/rust-postgres" } circle-ci = { repository = "sfackler/rust-postgres" }
[features] [features]
default = ["runtime"]
runtime = ["tokio-tcp", "tokio-uds"]
"with-bit-vec-0.5" = ["bit-vec-05"] "with-bit-vec-0.5" = ["bit-vec-05"]
"with-chrono-0.4" = ["chrono-04"] "with-chrono-0.4" = ["chrono-04"]
"with-eui48-0.4" = ["eui48-04"] "with-eui48-0.4" = ["eui48-04"]
@ -48,6 +51,8 @@ tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
void = "1.0" void = "1.0"
tokio-tcp = { version = "0.1", optional = true }
bit-vec-05 = { version = "0.5", package = "bit-vec", optional = true } bit-vec-05 = { version = "0.5", package = "bit-vec", optional = true }
chrono-04 = { version = "0.4", package = "chrono", optional = true } chrono-04 = { version = "0.4", package = "chrono", optional = true }
eui48-04 = { version = "0.4", package = "eui48", optional = true } eui48-04 = { version = "0.4", package = "eui48", optional = true }
@ -56,6 +61,9 @@ serde-1 = { version = "1.0", package = "serde", optional = true }
serde_json-1 = { version = "1.0", package = "serde_json", optional = true } serde_json-1 = { version = "1.0", package = "serde_json", optional = true }
uuid-07 = { version = "0.7", package = "uuid", optional = true } uuid-07 = { version = "0.7", package = "uuid", optional = true }
[target.'cfg(unix)'.dependencies]
tokio-uds = { version = "0.2", optional = true }
[dev-dependencies] [dev-dependencies]
tokio = "0.1.7" tokio = "0.1.7"
env_logger = "0.5" env_logger = "0.5"

View File

@ -3,8 +3,8 @@ use std::iter;
use std::str::{self, FromStr}; use std::str::{self, FromStr};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use crate::proto::ConnectFuture; use crate::proto::HandshakeFuture;
use crate::{Connect, Error, TlsMode}; use crate::{Error, Handshake, TlsMode};
#[derive(Clone)] #[derive(Clone)]
pub struct Builder { pub struct Builder {
@ -48,12 +48,12 @@ impl Builder {
Iter(self.params.iter()) Iter(self.params.iter())
} }
pub fn connect<S, T>(&self, stream: S, tls_mode: T) -> Connect<S, T> pub fn handshake<S, T>(&self, stream: S, tls_mode: T) -> Handshake<S, T>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>, T: TlsMode<S>,
{ {
Connect(ConnectFuture::new(stream, tls_mode, self.params.clone())) Handshake(HandshakeFuture::new(stream, tls_mode, self.params.clone()))
} }
} }

View File

@ -10,6 +10,8 @@ pub use crate::builder::*;
pub use crate::error::*; pub use crate::error::*;
use crate::proto::CancelFuture; use crate::proto::CancelFuture;
pub use crate::row::{Row, RowIndex}; pub use crate::row::{Row, RowIndex};
#[cfg(feature = "runtime")]
pub use crate::socket::Socket;
pub use crate::stmt::Column; pub use crate::stmt::Column;
pub use crate::tls::*; pub use crate::tls::*;
use crate::types::{ToSql, Type}; use crate::types::{ToSql, Type};
@ -18,6 +20,8 @@ mod builder;
pub mod error; pub mod error;
mod proto; mod proto;
mod row; mod row;
#[cfg(feature = "runtime")]
mod socket;
mod stmt; mod stmt;
mod tls; mod tls;
pub mod types; pub mod types;
@ -156,12 +160,12 @@ where
} }
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct Connect<S, T>(proto::ConnectFuture<S, T>) pub struct Handshake<S, T>(proto::HandshakeFuture<S, T>)
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>; T: TlsMode<S>;
impl<S, T> Future for Connect<S, T> impl<S, T> Future for Handshake<S, T>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>, T: TlsMode<S>,

View File

@ -16,7 +16,7 @@ use crate::proto::{Client, Connection, PostgresCodec, TlsFuture};
use crate::{CancelData, ChannelBinding, Error, TlsMode}; use crate::{CancelData, ChannelBinding, Error, TlsMode};
#[derive(StateMachineFuture)] #[derive(StateMachineFuture)]
pub enum Connect<S, T> pub enum Handshake<S, T>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>, T: TlsMode<S>,
@ -70,7 +70,7 @@ where
Failed(Error), Failed(Error),
} }
impl<S, T> PollConnect<S, T> for Connect<S, T> impl<S, T> PollHandshake<S, T> for Handshake<S, T>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>, T: TlsMode<S>,
@ -319,12 +319,12 @@ where
} }
} }
impl<S, T> ConnectFuture<S, T> impl<S, T> HandshakeFuture<S, T>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite,
T: TlsMode<S>, T: TlsMode<S>,
{ {
pub fn new(stream: S, tls_mode: T, params: HashMap<String, String>) -> ConnectFuture<S, T> { pub fn new(stream: S, tls_mode: T, params: HashMap<String, String>) -> HandshakeFuture<S, T> {
Connect::start(TlsFuture::new(stream, tls_mode), params) Handshake::start(TlsFuture::new(stream, tls_mode), params)
} }
} }

View File

@ -22,11 +22,11 @@ mod bind;
mod cancel; mod cancel;
mod client; mod client;
mod codec; mod codec;
mod connect;
mod connection; mod connection;
mod copy_in; mod copy_in;
mod copy_out; mod copy_out;
mod execute; mod execute;
mod handshake;
mod portal; mod portal;
mod prepare; mod prepare;
mod query; mod query;
@ -42,11 +42,11 @@ pub use crate::proto::bind::BindFuture;
pub use crate::proto::cancel::CancelFuture; pub use crate::proto::cancel::CancelFuture;
pub use crate::proto::client::Client; pub use crate::proto::client::Client;
pub use crate::proto::codec::PostgresCodec; pub use crate::proto::codec::PostgresCodec;
pub use crate::proto::connect::ConnectFuture;
pub use crate::proto::connection::Connection; pub use crate::proto::connection::Connection;
pub use crate::proto::copy_in::CopyInFuture; pub use crate::proto::copy_in::CopyInFuture;
pub use crate::proto::copy_out::CopyOutStream; pub use crate::proto::copy_out::CopyOutStream;
pub use crate::proto::execute::ExecuteFuture; pub use crate::proto::execute::ExecuteFuture;
pub use crate::proto::handshake::HandshakeFuture;
pub use crate::proto::portal::Portal; pub use crate::proto::portal::Portal;
pub use crate::proto::prepare::PrepareFuture; pub use crate::proto::prepare::PrepareFuture;
pub use crate::proto::query::QueryStream; pub use crate::proto::query::QueryStream;

View File

@ -0,0 +1,96 @@
use bytes::{Buf, BufMut};
use futures::Poll;
use std::io::{self, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream;
#[cfg(unix)]
use tokio_uds::UnixStream;
enum Inner {
Tcp(TcpStream),
#[cfg(unix)]
Unix(UnixStream),
}
pub struct Socket(Inner);
impl Socket {
pub(crate) fn new_tcp(stream: TcpStream) -> Socket {
Socket(Inner::Tcp(stream))
}
#[cfg(unix)]
pub(crate) fn new_unix(stream: UnixStream) -> Socket {
Socket(Inner::Unix(stream))
}
}
impl Read for Socket {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match &mut self.0 {
Inner::Tcp(s) => s.read(buf),
#[cfg(unix)]
Inner::Unix(s) => s.read(buf),
}
}
}
impl AsyncRead for Socket {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match &self.0 {
Inner::Tcp(s) => s.prepare_uninitialized_buffer(buf),
#[cfg(unix)]
Inner::Unix(s) => s.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
where
B: BufMut,
{
match &mut self.0 {
Inner::Tcp(s) => s.read_buf(buf),
#[cfg(unix)]
Inner::Unix(s) => s.read_buf(buf),
}
}
}
impl Write for Socket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match &mut self.0 {
Inner::Tcp(s) => s.write(buf),
#[cfg(unix)]
Inner::Unix(s) => s.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match &mut self.0 {
Inner::Tcp(s) => s.flush(),
#[cfg(unix)]
Inner::Unix(s) => s.flush(),
}
}
}
impl AsyncWrite for Socket {
fn shutdown(&mut self) -> Poll<(), io::Error> {
match &mut self.0 {
Inner::Tcp(s) => s.shutdown(),
#[cfg(unix)]
Inner::Unix(s) => s.shutdown(),
}
}
fn write_buf<B>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
where
B: Buf,
{
match &mut self.0 {
Inner::Tcp(s) => s.write_buf(buf),
#[cfg(unix)]
Inner::Unix(s) => s.write_buf(buf),
}
}
}

View File

@ -22,7 +22,7 @@ fn connect(
let builder = s.parse::<tokio_postgres::Builder>().unwrap(); let builder = s.parse::<tokio_postgres::Builder>().unwrap();
TcpStream::connect(&"127.0.0.1:5433".parse().unwrap()) TcpStream::connect(&"127.0.0.1:5433".parse().unwrap())
.map_err(|e| panic!("{}", e)) .map_err(|e| panic!("{}", e))
.and_then(move |s| builder.connect(s, NoTls)) .and_then(move |s| builder.handshake(s, NoTls))
} }
fn smoke_test(s: &str) { fn smoke_test(s: &str) {