diff --git a/postgres/src/cancel_token.rs b/postgres/src/cancel_token.rs new file mode 100644 index 00000000..f140e60e --- /dev/null +++ b/postgres/src/cancel_token.rs @@ -0,0 +1,36 @@ +use tokio::runtime; +use tokio_postgres::tls::MakeTlsConnect; +use tokio_postgres::{Error, Socket}; + +/// The capability to request cancellation of in-progress queries on a +/// connection. +#[derive(Clone)] +pub struct CancelToken(tokio_postgres::CancelToken); + +impl CancelToken { + pub(crate) fn new(inner: tokio_postgres::CancelToken) -> CancelToken { + CancelToken(inner) + } + + /// Attempts to cancel the in-progress query on the connection associated + /// with this `CancelToken`. + /// + /// The server provides no information about whether a cancellation attempt was successful or not. An error will + /// only be returned if the client was unable to connect to the database. + /// + /// Cancellation is inherently racy. There is no guarantee that the + /// cancellation request will reach the server before the query terminates + /// normally, or that the connection associated with this token is still + /// active. + pub fn cancel_query(&self, tls: T) -> Result<(), Error> + where + T: MakeTlsConnect, + { + runtime::Builder::new() + .enable_all() + .basic_scheduler() + .build() + .unwrap() // FIXME don't unwrap + .block_on(self.0.cancel_query(tls)) + } +} diff --git a/postgres/src/client.rs b/postgres/src/client.rs index 3e69670d..dd3495a1 100644 --- a/postgres/src/client.rs +++ b/postgres/src/client.rs @@ -1,4 +1,6 @@ -use crate::{Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction}; +use crate::{ + CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction, +}; use std::ops::{Deref, DerefMut}; use tokio::runtime::Runtime; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; @@ -443,6 +445,46 @@ impl Client { Ok(Transaction::new(&mut self.runtime, transaction)) } + /// Constructs a cancellation token that can later be used to request + /// cancellation of a query running on this connection. + /// + /// # Examples + /// + /// ```no_run + /// use postgres::{Client, NoTls}; + /// use postgres::error::SqlState; + /// use std::thread; + /// use std::time::Duration; + /// + /// # fn main() -> Result<(), Box> { + /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?; + /// + /// let cancel_token = client.cancel_token(); + /// + /// thread::spawn(move || { + /// // Abort the query after 5s. + /// thread::sleep(Duration::from_secs(5)); + /// cancel_token.cancel_query(NoTls); + /// }); + /// + /// match client.simple_query("SELECT long_running_query()") { + /// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => { + /// // Handle canceled query. + /// } + /// Err(err) => return Err(err.into()), + /// Ok(rows) => { + /// // ... + /// } + /// } + /// // ... + /// + /// # Ok(()) + /// # } + /// ``` + pub fn cancel_token(&self) -> CancelToken { + CancelToken::new(self.client.cancel_token()) + } + /// Determines if the client's connection has already closed. /// /// If this returns `true`, the client is no longer usable. diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs index 42b28f31..5399bfce 100644 --- a/postgres/src/lib.rs +++ b/postgres/src/lib.rs @@ -54,6 +54,7 @@ pub use tokio_postgres::{ error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement, }; +pub use crate::cancel_token::CancelToken; pub use crate::client::*; pub use crate::config::Config; pub use crate::copy_in_writer::CopyInWriter; @@ -68,6 +69,7 @@ pub use crate::tls::NoTls; pub use crate::transaction::*; pub mod binary_copy; +mod cancel_token; mod client; pub mod config; mod copy_in_writer; diff --git a/postgres/src/test.rs b/postgres/src/test.rs index 2275d715..449aac01 100644 --- a/postgres/src/test.rs +++ b/postgres/src/test.rs @@ -1,4 +1,7 @@ use std::io::{Read, Write}; +use std::thread; +use std::time::Duration; +use tokio_postgres::error::SqlState; use tokio_postgres::types::Type; use tokio_postgres::NoTls; @@ -288,3 +291,21 @@ fn portal() { assert_eq!(rows.len(), 1); assert_eq!(rows[0].get::<_, i32>(0), 3); } + +#[test] +fn cancel_query() { + let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap(); + + let cancel_token = client.cancel_token(); + let cancel_thread = thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + cancel_token.cancel_query(NoTls).unwrap(); + }); + + match client.batch_execute("SELECT pg_sleep(100)") { + Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {} + t => panic!("unexpected return: {:?}", t), + } + + cancel_thread.join().unwrap(); +} diff --git a/postgres/src/transaction.rs b/postgres/src/transaction.rs index 010e8b62..e5b3682f 100644 --- a/postgres/src/transaction.rs +++ b/postgres/src/transaction.rs @@ -1,4 +1,6 @@ -use crate::{CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement}; +use crate::{ + CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement, +}; use tokio::runtime::Runtime; use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::{Error, Row, SimpleQueryMessage}; @@ -168,6 +170,11 @@ impl<'a> Transaction<'a> { self.runtime.block_on(self.transaction.batch_execute(query)) } + /// Like `Client::cancel_token`. + pub fn cancel_token(&self) -> CancelToken { + CancelToken::new(self.transaction.cancel_token()) + } + /// Like `Client::transaction`. pub fn transaction(&mut self) -> Result, Error> { let transaction = self.runtime.block_on(self.transaction.transaction())?; diff --git a/tokio-postgres/src/cancel_token.rs b/tokio-postgres/src/cancel_token.rs new file mode 100644 index 00000000..d048a3c8 --- /dev/null +++ b/tokio-postgres/src/cancel_token.rs @@ -0,0 +1,63 @@ +use crate::config::SslMode; +use crate::tls::TlsConnect; +#[cfg(feature = "runtime")] +use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect, Socket}; +use crate::{cancel_query_raw, Error}; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// The capability to request cancellation of in-progress queries on a +/// connection. +#[derive(Clone)] +pub struct CancelToken { + #[cfg(feature = "runtime")] + pub(crate) socket_config: Option, + pub(crate) ssl_mode: SslMode, + pub(crate) process_id: i32, + pub(crate) secret_key: i32, +} + +impl CancelToken { + /// Attempts to cancel the in-progress query on the connection associated + /// with this `CancelToken`. + /// + /// The server provides no information about whether a cancellation attempt was successful or not. An error will + /// only be returned if the client was unable to connect to the database. + /// + /// Cancellation is inherently racy. There is no guarantee that the + /// cancellation request will reach the server before the query terminates + /// normally, or that the connection associated with this token is still + /// active. + /// + /// Requires the `runtime` Cargo feature (enabled by default). + #[cfg(feature = "runtime")] + pub async fn cancel_query(&self, tls: T) -> Result<(), Error> + where + T: MakeTlsConnect, + { + cancel_query::cancel_query( + self.socket_config.clone(), + self.ssl_mode, + tls, + self.process_id, + self.secret_key, + ) + .await + } + + /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new + /// connection itself. + pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> + where + S: AsyncRead + AsyncWrite + Unpin, + T: TlsConnect, + { + cancel_query_raw::cancel_query_raw( + stream, + self.ssl_mode, + tls, + self.process_id, + self.secret_key, + ) + .await + } +} diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index 68ccaf37..d9b4a311 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "runtime")] -use crate::cancel_query; use crate::codec::BackendMessages; use crate::config::{Host, SslMode}; use crate::connection::{Request, RequestMessages}; @@ -14,7 +12,7 @@ use crate::to_statement::ToStatement; use crate::types::{Oid, ToSql, Type}; #[cfg(feature = "runtime")] use crate::Socket; -use crate::{cancel_query_raw, copy_in, copy_out, query, CopyInSink, Transaction}; +use crate::{copy_in, copy_out, query, CancelToken, CopyInSink, Transaction}; use crate::{prepare, SimpleQueryMessage}; use crate::{simple_query, Row}; use crate::{Error, Statement}; @@ -451,6 +449,19 @@ impl Client { Ok(Transaction::new(self)) } + /// Constructs a cancellation token that can later be used to request + /// cancellation of a query running on the connection associated with + /// this client. + pub fn cancel_token(&self) -> CancelToken { + CancelToken { + #[cfg(feature = "runtime")] + socket_config: self.socket_config.clone(), + ssl_mode: self.ssl_mode, + process_id: self.process_id, + secret_key: self.secret_key, + } + } + /// Attempts to cancel an in-progress query. /// /// The server provides no information about whether a cancellation attempt was successful or not. An error will @@ -458,35 +469,23 @@ impl Client { /// /// Requires the `runtime` Cargo feature (enabled by default). #[cfg(feature = "runtime")] + #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")] pub async fn cancel_query(&self, tls: T) -> Result<(), Error> where T: MakeTlsConnect, { - cancel_query::cancel_query( - self.socket_config.clone(), - self.ssl_mode, - tls, - self.process_id, - self.secret_key, - ) - .await + self.cancel_token().cancel_query(tls).await } /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new /// connection itself. + #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")] pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin, T: TlsConnect, { - cancel_query_raw::cancel_query_raw( - stream, - self.ssl_mode, - tls, - self.process_id, - self.secret_key, - ) - .await + self.cancel_token().cancel_query_raw(stream, tls).await } /// Determines if the connection to the server has already closed. diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index dd0d8aba..ff87baf0 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -99,6 +99,7 @@ #![doc(html_root_url = "https://docs.rs/tokio-postgres/0.5")] #![warn(rust_2018_idioms, clippy::all, missing_docs)] +pub use crate::cancel_token::CancelToken; pub use crate::client::Client; pub use crate::config::Config; pub use crate::connection::Connection; @@ -125,6 +126,7 @@ mod bind; #[cfg(feature = "runtime")] mod cancel_query; mod cancel_query_raw; +mod cancel_token; mod client; mod codec; pub mod config; diff --git a/tokio-postgres/src/transaction.rs b/tokio-postgres/src/transaction.rs index c97d5f9f..f5a39684 100644 --- a/tokio-postgres/src/transaction.rs +++ b/tokio-postgres/src/transaction.rs @@ -9,8 +9,8 @@ use crate::types::{ToSql, Type}; #[cfg(feature = "runtime")] use crate::Socket; use crate::{ - bind, query, slice_iter, Client, CopyInSink, Error, Portal, Row, SimpleQueryMessage, Statement, - ToStatement, + bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row, + SimpleQueryMessage, Statement, ToStatement, }; use bytes::Buf; use futures::TryStreamExt; @@ -249,21 +249,30 @@ impl<'a> Transaction<'a> { self.client.batch_execute(query).await } + /// Like `Client::cancel_token`. + pub fn cancel_token(&self) -> CancelToken { + self.client.cancel_token() + } + /// Like `Client::cancel_query`. #[cfg(feature = "runtime")] + #[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")] pub async fn cancel_query(&self, tls: T) -> Result<(), Error> where T: MakeTlsConnect, { + #[allow(deprecated)] self.client.cancel_query(tls).await } /// Like `Client::cancel_query_raw`. + #[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")] pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin, T: TlsConnect, { + #[allow(deprecated)] self.client.cancel_query_raw(stream, tls).await } diff --git a/tokio-postgres/tests/test/main.rs b/tokio-postgres/tests/test/main.rs index 23174952..37ee0351 100644 --- a/tokio-postgres/tests/test/main.rs +++ b/tokio-postgres/tests/test/main.rs @@ -304,7 +304,8 @@ async fn cancel_query_raw() { let client = connect("user=postgres").await; let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap(); - let cancel = client.cancel_query_raw(socket, NoTls); + let cancel_token = client.cancel_token(); + let cancel = cancel_token.cancel_query_raw(socket, NoTls); let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel); let sleep = client.batch_execute("SELECT pg_sleep(100)"); diff --git a/tokio-postgres/tests/test/runtime.rs b/tokio-postgres/tests/test/runtime.rs index 35219d8a..e07aa4a6 100644 --- a/tokio-postgres/tests/test/runtime.rs +++ b/tokio-postgres/tests/test/runtime.rs @@ -70,7 +70,8 @@ async fn target_session_attrs_err() { async fn cancel_query() { let client = connect("host=localhost port=5433 user=postgres").await; - let cancel = client.cancel_query(NoTls); + let cancel_token = client.cancel_token(); + let cancel = cancel_token.cancel_query(NoTls); let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel); let sleep = client.batch_execute("SELECT pg_sleep(100)");