From 09a63d62553da1625c0f8385ab15e815b78d587d Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Tue, 3 Dec 2019 18:25:29 -0800 Subject: [PATCH] Move to local runtimes per connection This avoids a bunch of context switches and cross-thread synchronization, which ends up improving the performance of a simple query by ~20%, from 252us to 216us. --- postgres/Cargo.toml | 15 +++--- postgres/benches/bench.rs | 17 +++++++ postgres/src/client.rs | 82 +++++++++++++-------------------- postgres/src/config.rs | 63 ++++++------------------- postgres/src/copy_in_writer.rs | 15 +++--- postgres/src/copy_out_reader.rs | 23 +++++---- postgres/src/lib.rs | 22 +-------- postgres/src/row_iter.rs | 16 +++---- postgres/src/transaction.rs | 77 ++++++++++++++++++++----------- 9 files changed, 151 insertions(+), 179 deletions(-) create mode 100644 postgres/benches/bench.rs diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml index cab09cea..e766aa8e 100644 --- a/postgres/Cargo.toml +++ b/postgres/Cargo.toml @@ -10,6 +10,10 @@ readme = "../README.md" keywords = ["database", "postgres", "postgresql", "sql"] categories = ["database"] +[[bench]] +name = "bench" +harness = false + [package.metadata.docs.rs] all-features = true @@ -17,9 +21,6 @@ all-features = true circle-ci = { repository = "sfackler/rust-postgres" } [features] -default = ["runtime"] -runtime = ["tokio-postgres/runtime", "tokio", "lazy_static", "log"] - with-bit-vec-0_6 = ["tokio-postgres/with-bit-vec-0_6"] with-chrono-0_4 = ["tokio-postgres/with-chrono-0_4"] with-eui48-0_4 = ["tokio-postgres/with-eui48-0_4"] @@ -32,11 +33,11 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"] bytes = "0.5" fallible-iterator = "0.2" futures = "0.3" -tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false } +tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres" } -tokio = { version = "0.2", optional = true, features = ["rt-threaded"] } -lazy_static = { version = "1.0", optional = true } -log = { version = "0.4", optional = true } +tokio = { version = "0.2", features = ["rt-core"] } +log = "0.4" [dev-dependencies] +criterion = "0.3" tokio = "0.2" diff --git a/postgres/benches/bench.rs b/postgres/benches/bench.rs new file mode 100644 index 00000000..474d8359 --- /dev/null +++ b/postgres/benches/bench.rs @@ -0,0 +1,17 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use postgres::{Client, NoTls}; + +// spawned: 249us 252us 255us +// local: 214us 216us 219us +fn query_prepared(c: &mut Criterion) { + let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap(); + + let stmt = client.prepare("SELECT $1::INT8").unwrap(); + + c.bench_function("query_prepared", move |b| { + b.iter(|| client.query(&stmt, &[&1i64]).unwrap()) + }); +} + +criterion_group!(group, query_prepared); +criterion_main!(group); diff --git a/postgres/src/client.rs b/postgres/src/client.rs index 0fcaa85e..6db4cfa4 100644 --- a/postgres/src/client.rs +++ b/postgres/src/client.rs @@ -1,27 +1,25 @@ -#[cfg(feature = "runtime")] -use crate::Config; -use crate::{CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction}; -use futures::executor; +use crate::{Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction}; +use tokio::runtime::Runtime; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use tokio_postgres::types::{ToSql, Type}; -#[cfg(feature = "runtime")] -use tokio_postgres::Socket; -use tokio_postgres::{Error, Row, SimpleQueryMessage}; +use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket}; /// A synchronous PostgreSQL client. -/// -/// This is a lightweight wrapper over the asynchronous tokio_postgres `Client`. -pub struct Client(tokio_postgres::Client); +pub struct Client { + runtime: Runtime, + client: tokio_postgres::Client, +} impl Client { + pub(crate) fn new(runtime: Runtime, client: tokio_postgres::Client) -> Client { + Client { runtime, client } + } + /// A convenience function which parses a configuration string into a `Config` and then connects to the database. /// /// See the documentation for [`Config`] for information about the connection syntax. /// - /// Requires the `runtime` Cargo feature (enabled by default). - /// /// [`Config`]: config/struct.Config.html - #[cfg(feature = "runtime")] pub fn connect(params: &str, tls_mode: T) -> Result where T: MakeTlsConnect + 'static + Send, @@ -78,7 +76,7 @@ impl Client { where T: ?Sized + ToStatement, { - executor::block_on(self.0.execute(query, params)) + self.runtime.block_on(self.client.execute(query, params)) } /// Executes a statement, returning the resulting rows. @@ -114,7 +112,7 @@ impl Client { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query(query, params)) + self.runtime.block_on(self.client.query(query, params)) } /// Executes a statement which returns a single row, returning it. @@ -151,7 +149,7 @@ impl Client { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query_one(query, params)) + self.runtime.block_on(self.client.query_one(query, params)) } /// Executes a statement which returns zero or one rows, returning it. @@ -197,7 +195,7 @@ impl Client { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query_opt(query, params)) + self.runtime.block_on(self.client.query_opt(query, params)) } /// A maximally-flexible version of `query`. @@ -235,8 +233,10 @@ impl Client { I: IntoIterator, I::IntoIter: ExactSizeIterator, { - let stream = executor::block_on(self.0.query_raw(query, params))?; - Ok(RowIter::new(stream)) + let stream = self + .runtime + .block_on(self.client.query_raw(query, params))?; + Ok(RowIter::new(&mut self.runtime, stream)) } /// Creates a new prepared statement. @@ -263,7 +263,7 @@ impl Client { /// # } /// ``` pub fn prepare(&mut self, query: &str) -> Result { - executor::block_on(self.0.prepare(query)) + self.runtime.block_on(self.client.prepare(query)) } /// Like `prepare`, but allows the types of query parameters to be explicitly specified. @@ -294,7 +294,8 @@ impl Client { /// # } /// ``` pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result { - executor::block_on(self.0.prepare_typed(query, types)) + self.runtime + .block_on(self.client.prepare_typed(query, types)) } /// Executes a `COPY FROM STDIN` statement, returning the number of rows created. @@ -327,8 +328,8 @@ impl Client { where T: ?Sized + ToStatement, { - let sink = executor::block_on(self.0.copy_in(query, params))?; - Ok(CopyInWriter::new(sink)) + let sink = self.runtime.block_on(self.client.copy_in(query, params))?; + Ok(CopyInWriter::new(&mut self.runtime, sink)) } /// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data. @@ -358,8 +359,8 @@ impl Client { where T: ?Sized + ToStatement, { - let stream = executor::block_on(self.0.copy_out(query, params))?; - CopyOutReader::new(stream) + let stream = self.runtime.block_on(self.client.copy_out(query, params))?; + CopyOutReader::new(&mut self.runtime, stream) } /// Executes a sequence of SQL statements using the simple query protocol. @@ -378,7 +379,7 @@ impl Client { /// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass /// them to this method! pub fn simple_query(&mut self, query: &str) -> Result, Error> { - executor::block_on(self.0.simple_query(query)) + self.runtime.block_on(self.client.simple_query(query)) } /// Executes a sequence of SQL statements using the simple query protocol. @@ -392,7 +393,7 @@ impl Client { /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass /// them to this method! pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> { - executor::block_on(self.0.batch_execute(query)) + self.runtime.block_on(self.client.batch_execute(query)) } /// Begins a new database transaction. @@ -416,35 +417,14 @@ impl Client { /// # } /// ``` pub fn transaction(&mut self) -> Result, Error> { - let transaction = executor::block_on(self.0.transaction())?; - Ok(Transaction::new(transaction)) + let transaction = self.runtime.block_on(self.client.transaction())?; + Ok(Transaction::new(&mut self.runtime, transaction)) } /// Determines if the client's connection has already closed. /// /// If this returns `true`, the client is no longer usable. pub fn is_closed(&self) -> bool { - self.0.is_closed() - } - - /// Returns a shared reference to the inner nonblocking client. - pub fn get_ref(&self) -> &tokio_postgres::Client { - &self.0 - } - - /// Returns a mutable reference to the inner nonblocking client. - pub fn get_mut(&mut self) -> &mut tokio_postgres::Client { - &mut self.0 - } - - /// Consumes the client, returning the inner nonblocking client. - pub fn into_inner(self) -> tokio_postgres::Client { - self.0 - } -} - -impl From for Client { - fn from(c: tokio_postgres::Client) -> Client { - Client(c) + self.client.is_closed() } } diff --git a/postgres/src/config.rs b/postgres/src/config.rs index 34a29fe6..d50bd024 100644 --- a/postgres/src/config.rs +++ b/postgres/src/config.rs @@ -2,23 +2,19 @@ //! //! Requires the `runtime` Cargo feature (enabled by default). -use crate::{Client, RUNTIME}; -use futures::{executor, FutureExt}; +use crate::Client; +use futures::FutureExt; use log::error; use std::fmt; -use std::future::Future; use std::path::Path; -use std::pin::Pin; use std::str::FromStr; -use std::sync::{mpsc, Arc}; use std::time::Duration; +use tokio::runtime; #[doc(inline)] pub use tokio_postgres::config::{ChannelBinding, SslMode, TargetSessionAttrs}; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use tokio_postgres::{Error, Socket}; -type Spawn = dyn Fn(Pin + Send>>) + Sync + Send; - /// Connection configuration. /// /// Configuration can be parsed from libpq-style connection strings. These strings come in two formats: @@ -95,7 +91,6 @@ type Spawn = dyn Fn(Pin + Send>>) + Sync + Send; #[derive(Clone)] pub struct Config { config: tokio_postgres::Config, - spawner: Option>, } impl fmt::Debug for Config { @@ -117,7 +112,6 @@ impl Config { pub fn new() -> Config { Config { config: tokio_postgres::Config::new(), - spawner: None, } } @@ -242,17 +236,6 @@ impl Config { self } - /// Sets the spawner used to run the connection futures. - /// - /// Defaults to a postgres-specific tokio `Runtime`. - pub fn spawner(&mut self, spawn: F) -> &mut Config - where - F: Fn(Pin + Send>>) + 'static + Sync + Send, - { - self.spawner = Some(Arc::new(spawn)); - self - } - /// Opens a connection to a PostgreSQL database. pub fn connect(&self, tls: T) -> Result where @@ -261,38 +244,23 @@ impl Config { T::Stream: Send, >::Future: Send, { - let (client, connection) = match &self.spawner { - Some(spawn) => { - let (tx, rx) = mpsc::channel(); - let config = self.config.clone(); - let connect = async move { - let r = config.connect(tls).await; - let _ = tx.send(r); - }; - spawn(Box::pin(connect)); - rx.recv().unwrap()? - } - None => { - let connect = self.config.connect(tls); - RUNTIME.handle().enter(|| executor::block_on(connect))? - } - }; + let mut runtime = runtime::Builder::new() + .enable_all() + .basic_scheduler() + .build() + .unwrap(); // FIXME don't unwrap + let (client, connection) = runtime.block_on(self.config.connect(tls))?; + + // FIXME don't spawn this so error reporting is less weird. let connection = connection.map(|r| { if let Err(e) = r { error!("postgres connection error: {}", e) } }); - match &self.spawner { - Some(spawn) => { - spawn(Box::pin(connection)); - } - None => { - RUNTIME.spawn(connection); - } - } + runtime.spawn(connection); - Ok(Client::from(client)) + Ok(Client::new(runtime, client)) } } @@ -306,9 +274,6 @@ impl FromStr for Config { impl From for Config { fn from(config: tokio_postgres::Config) -> Config { - Config { - config, - spawner: None, - } + Config { config } } } diff --git a/postgres/src/copy_in_writer.rs b/postgres/src/copy_in_writer.rs index b7a2a009..897d8756 100644 --- a/postgres/src/copy_in_writer.rs +++ b/postgres/src/copy_in_writer.rs @@ -1,18 +1,18 @@ use bytes::{Bytes, BytesMut}; -use futures::{executor, SinkExt}; +use futures::SinkExt; use std::io; use std::io::Write; -use std::marker::PhantomData; use std::pin::Pin; +use tokio::runtime::Runtime; use tokio_postgres::{CopyInSink, Error}; /// The writer returned by the `copy_in` method. /// /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted. pub struct CopyInWriter<'a> { + runtime: &'a mut Runtime, sink: Pin>>, buf: BytesMut, - _p: PhantomData<&'a mut ()>, } // no-op impl to extend borrow until drop @@ -21,11 +21,11 @@ impl Drop for CopyInWriter<'_> { } impl<'a> CopyInWriter<'a> { - pub(crate) fn new(sink: CopyInSink) -> CopyInWriter<'a> { + pub(crate) fn new(runtime: &'a mut Runtime, sink: CopyInSink) -> CopyInWriter<'a> { CopyInWriter { + runtime, sink: Box::pin(sink), buf: BytesMut::new(), - _p: PhantomData, } } @@ -34,7 +34,7 @@ impl<'a> CopyInWriter<'a> { /// If this is not called, the copy will be aborted. pub fn finish(mut self) -> Result { self.flush_inner()?; - executor::block_on(self.sink.as_mut().finish()) + self.runtime.block_on(self.sink.as_mut().finish()) } fn flush_inner(&mut self) -> Result<(), Error> { @@ -42,7 +42,8 @@ impl<'a> CopyInWriter<'a> { return Ok(()); } - executor::block_on(self.sink.as_mut().send(self.buf.split().freeze())) + self.runtime + .block_on(self.sink.as_mut().send(self.buf.split().freeze())) } } diff --git a/postgres/src/copy_out_reader.rs b/postgres/src/copy_out_reader.rs index cd972725..14f8d630 100644 --- a/postgres/src/copy_out_reader.rs +++ b/postgres/src/copy_out_reader.rs @@ -1,15 +1,15 @@ use bytes::{Buf, Bytes}; -use futures::executor; +use futures::StreamExt; use std::io::{self, BufRead, Cursor, Read}; -use std::marker::PhantomData; use std::pin::Pin; +use tokio::runtime::Runtime; use tokio_postgres::{CopyOutStream, Error}; /// The reader returned by the `copy_out` method. pub struct CopyOutReader<'a> { - it: executor::BlockingStream>>, + runtime: &'a mut Runtime, + stream: Pin>, cur: Cursor, - _p: PhantomData<&'a mut ()>, } // no-op impl to extend borrow until drop @@ -18,18 +18,21 @@ impl Drop for CopyOutReader<'_> { } impl<'a> CopyOutReader<'a> { - pub(crate) fn new(stream: CopyOutStream) -> Result, Error> { - let mut it = executor::block_on_stream(Box::pin(stream)); - let cur = match it.next() { + pub(crate) fn new( + runtime: &'a mut Runtime, + stream: CopyOutStream, + ) -> Result, Error> { + let mut stream = Box::pin(stream); + let cur = match runtime.block_on(stream.next()) { Some(Ok(cur)) => cur, Some(Err(e)) => return Err(e), None => Bytes::new(), }; Ok(CopyOutReader { - it, + runtime, + stream, cur: Cursor::new(cur), - _p: PhantomData, }) } } @@ -47,7 +50,7 @@ impl Read for CopyOutReader<'_> { impl BufRead for CopyOutReader<'_> { fn fill_buf(&mut self) -> io::Result<&[u8]> { if self.cur.remaining() == 0 { - match self.it.next() { + match self.runtime.block_on(self.stream.next()) { Some(Ok(cur)) => self.cur = Cursor::new(cur), Some(Err(e)) => return Err(io::Error::new(io::ErrorKind::Other, e)), None => {} diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs index 13c9a21d..bfe18764 100644 --- a/postgres/src/lib.rs +++ b/postgres/src/lib.rs @@ -55,19 +55,11 @@ #![doc(html_root_url = "https://docs.rs/postgres/0.17")] #![warn(clippy::all, rust_2018_idioms, missing_docs)] -#[cfg(feature = "runtime")] -use lazy_static::lazy_static; -#[cfg(feature = "runtime")] -use tokio::runtime::{self, Runtime}; - -#[cfg(feature = "runtime")] -pub use tokio_postgres::Socket; pub use tokio_postgres::{ - error, row, tls, types, Column, Portal, SimpleQueryMessage, Statement, ToStatement, + error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement, }; pub use crate::client::*; -#[cfg(feature = "runtime")] pub use crate::config::Config; pub use crate::copy_in_writer::CopyInWriter; pub use crate::copy_out_reader::CopyOutReader; @@ -81,23 +73,11 @@ pub use crate::tls::NoTls; pub use crate::transaction::*; mod client; -#[cfg(feature = "runtime")] pub mod config; mod copy_in_writer; mod copy_out_reader; mod row_iter; mod transaction; -#[cfg(feature = "runtime")] #[cfg(test)] mod test; - -#[cfg(feature = "runtime")] -lazy_static! { - static ref RUNTIME: Runtime = runtime::Builder::new() - .thread_name("postgres") - .threaded_scheduler() - .enable_all() - .build() - .unwrap(); -} diff --git a/postgres/src/row_iter.rs b/postgres/src/row_iter.rs index 08da6873..150b5514 100644 --- a/postgres/src/row_iter.rs +++ b/postgres/src/row_iter.rs @@ -1,13 +1,13 @@ use fallible_iterator::FallibleIterator; -use futures::executor::{self, BlockingStream}; -use std::marker::PhantomData; use std::pin::Pin; +use tokio::runtime::Runtime; use tokio_postgres::{Error, Row, RowStream}; +use futures::StreamExt; /// The iterator returned by `query_raw`. pub struct RowIter<'a> { - it: BlockingStream>>, - _p: PhantomData<&'a mut ()>, + runtime: &'a mut Runtime, + it: Pin>, } // no-op impl to extend the borrow until drop @@ -16,10 +16,10 @@ impl Drop for RowIter<'_> { } impl<'a> RowIter<'a> { - pub(crate) fn new(stream: RowStream) -> RowIter<'a> { + pub(crate) fn new(runtime: &'a mut Runtime, stream: RowStream) -> RowIter<'a> { RowIter { - it: executor::block_on_stream(Box::pin(stream)), - _p: PhantomData, + runtime, + it: Box::pin(stream), } } } @@ -29,6 +29,6 @@ impl FallibleIterator for RowIter<'_> { type Error = Error; fn next(&mut self) -> Result, Error> { - self.it.next().transpose() + self.runtime.block_on(self.it.next()).transpose() } } diff --git a/postgres/src/transaction.rs b/postgres/src/transaction.rs index 616a5872..7c262018 100644 --- a/postgres/src/transaction.rs +++ b/postgres/src/transaction.rs @@ -1,5 +1,5 @@ use crate::{CopyInWriter, CopyOutReader, Portal, RowIter, Statement, ToStatement}; -use futures::executor; +use tokio::runtime::Runtime; use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::{Error, Row, SimpleQueryMessage}; @@ -7,33 +7,43 @@ use tokio_postgres::{Error, Row, SimpleQueryMessage}; /// /// Transactions will implicitly roll back by default when dropped. Use the `commit` method to commit the changes made /// in the transaction. Transactions can be nested, with inner transactions implemented via savepoints. -pub struct Transaction<'a>(tokio_postgres::Transaction<'a>); +pub struct Transaction<'a> { + runtime: &'a mut Runtime, + transaction: tokio_postgres::Transaction<'a>, +} impl<'a> Transaction<'a> { - pub(crate) fn new(transaction: tokio_postgres::Transaction<'a>) -> Transaction<'a> { - Transaction(transaction) + pub(crate) fn new( + runtime: &'a mut Runtime, + transaction: tokio_postgres::Transaction<'a>, + ) -> Transaction<'a> { + Transaction { + runtime, + transaction, + } } /// Consumes the transaction, committing all changes made within it. pub fn commit(self) -> Result<(), Error> { - executor::block_on(self.0.commit()) + self.runtime.block_on(self.transaction.commit()) } /// Rolls the transaction back, discarding all changes made within it. /// /// This is equivalent to `Transaction`'s `Drop` implementation, but provides any error encountered to the caller. pub fn rollback(self) -> Result<(), Error> { - executor::block_on(self.0.rollback()) + self.runtime.block_on(self.transaction.rollback()) } /// Like `Client::prepare`. pub fn prepare(&mut self, query: &str) -> Result { - executor::block_on(self.0.prepare(query)) + self.runtime.block_on(self.transaction.prepare(query)) } /// Like `Client::prepare_typed`. pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result { - executor::block_on(self.0.prepare_typed(query, types)) + self.runtime + .block_on(self.transaction.prepare_typed(query, types)) } /// Like `Client::execute`. @@ -41,7 +51,8 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - executor::block_on(self.0.execute(query, params)) + self.runtime + .block_on(self.transaction.execute(query, params)) } /// Like `Client::query`. @@ -49,7 +60,7 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query(query, params)) + self.runtime.block_on(self.transaction.query(query, params)) } /// Like `Client::query_one`. @@ -57,7 +68,8 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query_one(query, params)) + self.runtime + .block_on(self.transaction.query_one(query, params)) } /// Like `Client::query_opt`. @@ -69,7 +81,8 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - executor::block_on(self.0.query_opt(query, params)) + self.runtime + .block_on(self.transaction.query_opt(query, params)) } /// Like `Client::query_raw`. @@ -79,8 +92,10 @@ impl<'a> Transaction<'a> { I: IntoIterator, I::IntoIter: ExactSizeIterator, { - let stream = executor::block_on(self.0.query_raw(query, params))?; - Ok(RowIter::new(stream)) + let stream = self + .runtime + .block_on(self.transaction.query_raw(query, params))?; + Ok(RowIter::new(self.runtime, stream)) } /// Binds parameters to a statement, creating a "portal". @@ -97,7 +112,7 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - executor::block_on(self.0.bind(query, params)) + self.runtime.block_on(self.transaction.bind(query, params)) } /// Continues execution of a portal, returning the next set of rows. @@ -105,7 +120,8 @@ impl<'a> Transaction<'a> { /// Unlike `query`, portals can be incrementally evaluated by limiting the number of rows returned in each call to /// `query_portal`. If the requested number is negative or 0, all remaining rows will be returned. pub fn query_portal(&mut self, portal: &Portal, max_rows: i32) -> Result, Error> { - executor::block_on(self.0.query_portal(portal, max_rows)) + self.runtime + .block_on(self.transaction.query_portal(portal, max_rows)) } /// The maximally flexible version of `query_portal`. @@ -114,8 +130,10 @@ impl<'a> Transaction<'a> { portal: &Portal, max_rows: i32, ) -> Result, Error> { - let stream = executor::block_on(self.0.query_portal_raw(portal, max_rows))?; - Ok(RowIter::new(stream)) + let stream = self + .runtime + .block_on(self.transaction.query_portal_raw(portal, max_rows))?; + Ok(RowIter::new(self.runtime, stream)) } /// Like `Client::copy_in`. @@ -127,8 +145,10 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - let sink = executor::block_on(self.0.copy_in(query, params))?; - Ok(CopyInWriter::new(sink)) + let sink = self + .runtime + .block_on(self.transaction.copy_in(query, params))?; + Ok(CopyInWriter::new(self.runtime, sink)) } /// Like `Client::copy_out`. @@ -140,23 +160,28 @@ impl<'a> Transaction<'a> { where T: ?Sized + ToStatement, { - let stream = executor::block_on(self.0.copy_out(query, params))?; - CopyOutReader::new(stream) + let stream = self + .runtime + .block_on(self.transaction.copy_out(query, params))?; + CopyOutReader::new(self.runtime, stream) } /// Like `Client::simple_query`. pub fn simple_query(&mut self, query: &str) -> Result, Error> { - executor::block_on(self.0.simple_query(query)) + self.runtime.block_on(self.transaction.simple_query(query)) } /// Like `Client::batch_execute`. pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> { - executor::block_on(self.0.batch_execute(query)) + self.runtime.block_on(self.transaction.batch_execute(query)) } /// Like `Client::transaction`. pub fn transaction(&mut self) -> Result, Error> { - let transaction = executor::block_on(self.0.transaction())?; - Ok(Transaction(transaction)) + let transaction = self.runtime.block_on(self.transaction.transaction())?; + Ok(Transaction { + runtime: self.runtime, + transaction, + }) } }