Fix custom spawn interface in sync API

This commit is contained in:
Steven Fackler 2019-11-27 20:58:02 -05:00
parent 47d97f0d2e
commit 223514fcd5
2 changed files with 21 additions and 31 deletions

View File

@ -32,9 +32,7 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
bytes = "0.5"
fallible-iterator = "0.2"
futures = "0.3"
pin-utils = "=0.1.0-alpha.4"
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false }
tokio-executor = "=0.2.0-alpha.6"
tokio = { version = "0.2", optional = true, features = ["rt-threaded"] }
lazy_static = { version = "1.0", optional = true }

View File

@ -2,21 +2,20 @@
//!
//! Requires the `runtime` Cargo feature (enabled by default).
use futures::{FutureExt, executor};
use crate::{Client, RUNTIME};
use futures::{executor, FutureExt};
use log::error;
use std::fmt;
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{mpsc, Arc};
use std::time::Duration;
use tokio_executor::Executor;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::{Error, Socket};
#[doc(inline)]
pub use tokio_postgres::config::{ChannelBinding, SslMode, TargetSessionAttrs};
use crate::{Client, RUNTIME};
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::{Error, Socket};
/// Connection configuration.
///
@ -94,8 +93,7 @@ use crate::{Client, RUNTIME};
#[derive(Clone)]
pub struct Config {
config: tokio_postgres::Config,
// this is an option since we don't want to boot up our default runtime unless we're actually going to use it.
executor: Option<Arc<Mutex<dyn Executor + Send>>>,
spawner: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Sync + Send>>,
}
impl fmt::Debug for Config {
@ -117,7 +115,7 @@ impl Config {
pub fn new() -> Config {
Config {
config: tokio_postgres::Config::new(),
executor: None,
spawner: None,
}
}
@ -242,14 +240,14 @@ impl Config {
self
}
/// Sets the executor used to run the connection futures.
/// Sets the spawner used to run the connection futures.
///
/// Defaults to a postgres-specific tokio `Runtime`.
pub fn executor<E>(&mut self, executor: E) -> &mut Config
pub fn spawner<F>(&mut self, spawn: F) -> &mut Config
where
E: Executor + 'static + Send,
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + 'static + Sync + Send,
{
self.executor = Some(Arc::new(Mutex::new(executor)));
self.spawner = Some(Arc::new(spawn));
self
}
@ -261,22 +259,20 @@ impl Config {
T::Stream: Send,
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
let (client, connection) = match &self.executor {
Some(executor) => {
let (client, connection) = match &self.spawner {
Some(spawn) => {
let (tx, rx) = mpsc::channel();
let config = self.config.clone();
let connect = async move {
let r = config.connect(tls).await;
let _ = tx.send(r);
};
executor.lock().unwrap().spawn(Box::pin(connect)).unwrap();
spawn(Box::pin(connect));
rx.recv().unwrap()?
}
None => {
let connect = self.config.connect(tls);
RUNTIME.handle().enter(|| {
executor::block_on(connect)
})?
RUNTIME.handle().enter(|| executor::block_on(connect))?
}
};
@ -285,13 +281,9 @@ impl Config {
error!("postgres connection error: {}", e)
}
});
match &self.executor {
Some(executor) => {
executor
.lock()
.unwrap()
.spawn(Box::pin(connection))
.unwrap();
match &self.spawner {
Some(spawn) => {
spawn(Box::pin(connection));
}
None => {
RUNTIME.spawn(connection);
@ -314,7 +306,7 @@ impl From<tokio_postgres::Config> for Config {
fn from(config: tokio_postgres::Config) -> Config {
Config {
config,
executor: None,
spawner: None,
}
}
}