Merge pull request #531 from benesch/synchronous-cancel

Support cancellation in synchronous client
This commit is contained in:
Steven Fackler 2019-12-30 21:09:19 -05:00 committed by GitHub
commit 7bd21b492e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 208 additions and 25 deletions

View File

@ -0,0 +1,36 @@
use tokio::runtime;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::{Error, Socket};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone)]
pub struct CancelToken(tokio_postgres::CancelToken);
impl CancelToken {
pub(crate) fn new(inner: tokio_postgres::CancelToken) -> CancelToken {
CancelToken(inner)
}
/// Attempts to cancel the in-progress query on the connection associated
/// with this `CancelToken`.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
/// only be returned if the client was unable to connect to the database.
///
/// Cancellation is inherently racy. There is no guarantee that the
/// cancellation request will reach the server before the query terminates
/// normally, or that the connection associated with this token is still
/// active.
pub fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.unwrap() // FIXME don't unwrap
.block_on(self.0.cancel_query(tls))
}
}

View File

@ -1,4 +1,6 @@
use crate::{Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction};
use crate::{
CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction,
};
use std::ops::{Deref, DerefMut};
use tokio::runtime::Runtime;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
@ -443,6 +445,46 @@ impl Client {
Ok(Transaction::new(&mut self.runtime, transaction))
}
/// Constructs a cancellation token that can later be used to request
/// cancellation of a query running on this connection.
///
/// # Examples
///
/// ```no_run
/// use postgres::{Client, NoTls};
/// use postgres::error::SqlState;
/// use std::thread;
/// use std::time::Duration;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let cancel_token = client.cancel_token();
///
/// thread::spawn(move || {
/// // Abort the query after 5s.
/// thread::sleep(Duration::from_secs(5));
/// cancel_token.cancel_query(NoTls);
/// });
///
/// match client.simple_query("SELECT long_running_query()") {
/// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
/// // Handle canceled query.
/// }
/// Err(err) => return Err(err.into()),
/// Ok(rows) => {
/// // ...
/// }
/// }
/// // ...
///
/// # Ok(())
/// # }
/// ```
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.client.cancel_token())
}
/// Determines if the client's connection has already closed.
///
/// If this returns `true`, the client is no longer usable.

View File

@ -54,6 +54,7 @@ pub use tokio_postgres::{
error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement,
};
pub use crate::cancel_token::CancelToken;
pub use crate::client::*;
pub use crate::config::Config;
pub use crate::copy_in_writer::CopyInWriter;
@ -68,6 +69,7 @@ pub use crate::tls::NoTls;
pub use crate::transaction::*;
pub mod binary_copy;
mod cancel_token;
mod client;
pub mod config;
mod copy_in_writer;

View File

@ -1,4 +1,7 @@
use std::io::{Read, Write};
use std::thread;
use std::time::Duration;
use tokio_postgres::error::SqlState;
use tokio_postgres::types::Type;
use tokio_postgres::NoTls;
@ -288,3 +291,21 @@ fn portal() {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get::<_, i32>(0), 3);
}
#[test]
fn cancel_query() {
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
let cancel_token = client.cancel_token();
let cancel_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
cancel_token.cancel_query(NoTls).unwrap();
});
match client.batch_execute("SELECT pg_sleep(100)") {
Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {}
t => panic!("unexpected return: {:?}", t),
}
cancel_thread.join().unwrap();
}

View File

@ -1,4 +1,6 @@
use crate::{CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement};
use crate::{
CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement,
};
use tokio::runtime::Runtime;
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage};
@ -168,6 +170,11 @@ impl<'a> Transaction<'a> {
self.runtime.block_on(self.transaction.batch_execute(query))
}
/// Like `Client::cancel_token`.
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.transaction.cancel_token())
}
/// Like `Client::transaction`.
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
let transaction = self.runtime.block_on(self.transaction.transaction())?;

View File

@ -0,0 +1,63 @@
use crate::config::SslMode;
use crate::tls::TlsConnect;
#[cfg(feature = "runtime")]
use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect, Socket};
use crate::{cancel_query_raw, Error};
use tokio::io::{AsyncRead, AsyncWrite};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone)]
pub struct CancelToken {
#[cfg(feature = "runtime")]
pub(crate) socket_config: Option<SocketConfig>,
pub(crate) ssl_mode: SslMode,
pub(crate) process_id: i32,
pub(crate) secret_key: i32,
}
impl CancelToken {
/// Attempts to cancel the in-progress query on the connection associated
/// with this `CancelToken`.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
/// only be returned if the client was unable to connect to the database.
///
/// Cancellation is inherently racy. There is no guarantee that the
/// cancellation request will reach the server before the query terminates
/// normally, or that the connection associated with this token is still
/// active.
///
/// Requires the `runtime` Cargo feature (enabled by default).
#[cfg(feature = "runtime")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
cancel_query::cancel_query(
self.socket_config.clone(),
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
}
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself.
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
cancel_query_raw::cancel_query_raw(
stream,
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
}
}

View File

@ -1,5 +1,3 @@
#[cfg(feature = "runtime")]
use crate::cancel_query;
use crate::codec::BackendMessages;
use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
@ -14,7 +12,7 @@ use crate::to_statement::ToStatement;
use crate::types::{Oid, ToSql, Type};
#[cfg(feature = "runtime")]
use crate::Socket;
use crate::{cancel_query_raw, copy_in, copy_out, query, CopyInSink, Transaction};
use crate::{copy_in, copy_out, query, CancelToken, CopyInSink, Transaction};
use crate::{prepare, SimpleQueryMessage};
use crate::{simple_query, Row};
use crate::{Error, Statement};
@ -451,6 +449,19 @@ impl Client {
Ok(Transaction::new(self))
}
/// Constructs a cancellation token that can later be used to request
/// cancellation of a query running on the connection associated with
/// this client.
pub fn cancel_token(&self) -> CancelToken {
CancelToken {
#[cfg(feature = "runtime")]
socket_config: self.socket_config.clone(),
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
}
}
/// Attempts to cancel an in-progress query.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
@ -458,35 +469,23 @@ impl Client {
///
/// Requires the `runtime` Cargo feature (enabled by default).
#[cfg(feature = "runtime")]
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
cancel_query::cancel_query(
self.socket_config.clone(),
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
self.cancel_token().cancel_query(tls).await
}
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself.
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
cancel_query_raw::cancel_query_raw(
stream,
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
self.cancel_token().cancel_query_raw(stream, tls).await
}
/// Determines if the connection to the server has already closed.

View File

@ -99,6 +99,7 @@
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.5")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
pub use crate::cancel_token::CancelToken;
pub use crate::client::Client;
pub use crate::config::Config;
pub use crate::connection::Connection;
@ -125,6 +126,7 @@ mod bind;
#[cfg(feature = "runtime")]
mod cancel_query;
mod cancel_query_raw;
mod cancel_token;
mod client;
mod codec;
pub mod config;

View File

@ -9,8 +9,8 @@ use crate::types::{ToSql, Type};
#[cfg(feature = "runtime")]
use crate::Socket;
use crate::{
bind, query, slice_iter, Client, CopyInSink, Error, Portal, Row, SimpleQueryMessage, Statement,
ToStatement,
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
SimpleQueryMessage, Statement, ToStatement,
};
use bytes::Buf;
use futures::TryStreamExt;
@ -245,21 +245,30 @@ impl<'a> Transaction<'a> {
self.client.batch_execute(query).await
}
/// Like `Client::cancel_token`.
pub fn cancel_token(&self) -> CancelToken {
self.client.cancel_token()
}
/// Like `Client::cancel_query`.
#[cfg(feature = "runtime")]
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
#[allow(deprecated)]
self.client.cancel_query(tls).await
}
/// Like `Client::cancel_query_raw`.
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
#[allow(deprecated)]
self.client.cancel_query_raw(stream, tls).await
}

View File

@ -304,7 +304,8 @@ async fn cancel_query_raw() {
let client = connect("user=postgres").await;
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
let cancel = client.cancel_query_raw(socket, NoTls);
let cancel_token = client.cancel_token();
let cancel = cancel_token.cancel_query_raw(socket, NoTls);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)");

View File

@ -70,7 +70,8 @@ async fn target_session_attrs_err() {
async fn cancel_query() {
let client = connect("host=localhost port=5433 user=postgres").await;
let cancel = client.cancel_query(NoTls);
let cancel_token = client.cancel_token();
let cancel = cancel_token.cancel_query(NoTls);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)");