tokio-postgres query cancellation

This commit is contained in:
Steven Fackler 2018-06-27 22:37:43 -07:00
parent c2fb9c6de0
commit be2ca03fa9
5 changed files with 151 additions and 2 deletions

View File

@ -63,6 +63,10 @@ pub enum TlsMode {
Require(Box<TlsConnect>),
}
pub fn cancel_query(params: ConnectParams, tls: TlsMode, cancel_data: CancelData) -> CancelQuery {
CancelQuery(proto::CancelFuture::new(params, tls, cancel_data))
}
pub fn connect(params: ConnectParams, tls: TlsMode) -> Handshake {
Handshake(proto::HandshakeFuture::new(params, tls))
}
@ -110,6 +114,18 @@ impl Future for Connection {
}
}
#[must_use = "futures do nothing unless polled"]
pub struct CancelQuery(proto::CancelFuture);
impl Future for CancelQuery {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
self.0.poll()
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Handshake(proto::HandshakeFuture);

View File

@ -0,0 +1,69 @@
use futures::{Future, Poll};
use postgres_protocol::message::frontend;
use state_machine_future::RentToOwn;
use tokio_io::io::{self, Flush, WriteAll};
use error::Error;
use params::ConnectParams;
use proto::connect::ConnectFuture;
use tls::TlsStream;
use {CancelData, TlsMode};
#[derive(StateMachineFuture)]
pub enum Cancel {
#[state_machine_future(start, transitions(SendingCancel))]
Start {
future: ConnectFuture,
cancel_data: CancelData,
},
#[state_machine_future(transitions(FlushingCancel))]
SendingCancel {
future: WriteAll<Box<TlsStream>, Vec<u8>>,
},
#[state_machine_future(transitions(Finished))]
FlushingCancel { future: Flush<Box<TlsStream>> },
#[state_machine_future(ready)]
Finished(()),
#[state_machine_future(error)]
Failed(Error),
}
impl PollCancel for Cancel {
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start>) -> Poll<AfterStart, Error> {
let stream = try_ready!(state.future.poll());
let mut buf = vec![];
frontend::cancel_request(
state.cancel_data.process_id,
state.cancel_data.secret_key,
&mut buf,
);
transition!(SendingCancel {
future: io::write_all(stream, buf),
})
}
fn poll_sending_cancel<'a>(
state: &'a mut RentToOwn<'a, SendingCancel>,
) -> Poll<AfterSendingCancel, Error> {
let (stream, _) = try_ready!(state.future.poll());
transition!(FlushingCancel {
future: io::flush(stream),
})
}
fn poll_flushing_cancel<'a>(
state: &'a mut RentToOwn<'a, FlushingCancel>,
) -> Poll<AfterFlushingCancel, Error> {
try_ready!(state.future.poll());
transition!(Finished(()))
}
}
impl CancelFuture {
pub fn new(params: ConnectParams, mode: TlsMode, cancel_data: CancelData) -> CancelFuture {
Cancel::start(ConnectFuture::new(params, mode), cancel_data)
}
}

View File

@ -7,7 +7,7 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration, Instant};
use std::vec;
use tokio_io::io::{read_exact, write_all, ReadExact, WriteAll};
use tokio_io::io::{flush, read_exact, write_all, Flush, ReadExact, WriteAll};
use tokio_tcp::{self, TcpStream};
use tokio_timer::Delay;
@ -59,13 +59,20 @@ pub enum Connect {
params: ConnectParams,
tls: TlsMode,
},
#[state_machine_future(transitions(ReadingSsl))]
#[state_machine_future(transitions(FlushingSsl))]
SendingSsl {
future: WriteAll<Socket, Vec<u8>>,
params: ConnectParams,
connector: Box<TlsConnect>,
required: bool,
},
#[state_machine_future(transitions(ReadingSsl))]
FlushingSsl {
future: Flush<Socket>,
params: ConnectParams,
connector: Box<TlsConnect>,
required: bool,
},
#[state_machine_future(transitions(ConnectingTls, Ready))]
ReadingSsl {
future: ReadExact<Socket, [u8; 1]>,
@ -228,6 +235,19 @@ impl PollConnect for Connect {
) -> Poll<AfterSendingSsl, Error> {
let (stream, _) = try_ready!(state.future.poll());
let state = state.take();
transition!(FlushingSsl {
future: flush(stream),
params: state.params,
connector: state.connector,
required: state.required,
})
}
fn poll_flushing_ssl<'a>(
state: &'a mut RentToOwn<'a, FlushingSsl>,
) -> Poll<AfterFlushingSsl, Error> {
let stream = try_ready!(state.future.poll());
let state = state.take();
transition!(ReadingSsl {
future: read_exact(stream, [0]),
params: state.params,

View File

@ -8,6 +8,7 @@ macro_rules! try_receive {
};
}
mod cancel;
mod client;
mod codec;
mod connect;
@ -20,6 +21,7 @@ mod row;
mod socket;
mod statement;
pub use proto::cancel::CancelFuture;
pub use proto::client::Client;
pub use proto::codec::PostgresCodec;
pub use proto::connection::Connection;

View File

@ -1,9 +1,12 @@
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate tokio_postgres;
use std::time::{Duration, Instant};
use tokio::prelude::*;
use tokio::runtime::current_thread::Runtime;
use tokio::timer::Delay;
use tokio_postgres::error::SqlState;
use tokio_postgres::types::Type;
use tokio_postgres::TlsMode;
@ -214,3 +217,42 @@ fn insert_select() {
let tests = insert.join(select);
runtime.block_on(tests).unwrap();
}
#[test]
fn cancel_query() {
let _ = env_logger::try_init();
let mut runtime = Runtime::new().unwrap();
let handshake = tokio_postgres::connect(
"postgres://postgres@localhost:5433".parse().unwrap(),
TlsMode::None,
);
let (mut client, connection) = runtime.block_on(handshake).unwrap();
let cancel_data = connection.cancel_data();
let connection = connection.map_err(|e| panic!("{}", e));
runtime.handle().spawn(connection).unwrap();
let sleep = client.prepare("SELECT pg_sleep(100)");
let sleep = runtime.block_on(sleep).unwrap();
let sleep = client.execute(&sleep, &[]).then(|r| match r {
Ok(_) => panic!("unexpected success"),
Err(ref e) if e.code() == Some(&SqlState::QUERY_CANCELED) => Ok::<(), ()>(()),
Err(e) => panic!("unexpected error {}", e),
});
let cancel = Delay::new(Instant::now() + Duration::from_millis(100))
.then(|r| {
r.unwrap();
tokio_postgres::cancel_query(
"postgres://postgres@localhost:5433".parse().unwrap(),
TlsMode::None,
cancel_data,
)
})
.then(|r| {
r.unwrap();
Ok::<(), ()>(())
});
let ((), ()) = runtime.block_on(sleep.join(cancel)).unwrap();
}