Support query cancellation

This commit is contained in:
Steven Fackler 2013-10-20 17:32:14 -07:00
parent e37e7088d9
commit 0a9dcfdf61
3 changed files with 117 additions and 27 deletions

112
lib.rs
View File

@ -110,6 +110,7 @@ use message::{BackendMessage,
RowDescription};
use message::{FrontendMessage,
Bind,
CancelRequest,
Close,
Describe,
Execute,
@ -130,6 +131,8 @@ pub mod types;
#[cfg(test)]
mod tests;
static DEFAULT_PORT: Port = 5432;
/// Trait for types that can handle Postgres notice messages
pub trait PostgresNoticeHandler {
/// Handle a Postgres notice message
@ -320,11 +323,77 @@ impl PostgresDbError {
}
}
/// Contains information necessary to cancel queries for a session
pub struct PostgresCancelData {
/// The process ID of the session
process_id: i32,
/// The secret key for the session
secret_key: i32,
}
/// Attempts to cancel an in-progress query.
///
/// The backend provides no information about whether a cancellation attempt
/// was successful or not. An error will only be returned if the driver was
/// unable to connect to the database.
///
/// A `PostgresCancelData` object can be created via
/// `PostgresConnection::cancel_data`. The object can cancel any query made on
/// that connection.
pub fn cancel_query(url: &str, data: PostgresCancelData)
-> Option<PostgresConnectError> {
let Url { host, port, _ }: Url = match FromStr::from_str(url) {
Some(url) => url,
None => return Some(InvalidUrl)
};
let port = match port {
Some(port) => FromStr::from_str(port).unwrap(),
None => DEFAULT_PORT
};
let mut socket = match open_socket(host, port) {
Ok(socket) => socket,
Err(err) => return Some(err)
};
socket.write_message(&CancelRequest {
code: message::CANCEL_CODE,
process_id: data.process_id,
secret_key: data.secret_key
});
None
}
fn open_socket(host: &str, port: Port)
-> Result<TcpStream, PostgresConnectError> {
let addrs = do io_error::cond.trap(|_| {}).inside {
net::get_host_addresses(host)
};
let addrs = match addrs {
Some(addrs) => addrs,
None => return Err(DnsError)
};
for addr in addrs.iter() {
let socket = do io_error::cond.trap(|_| {}).inside {
TcpStream::connect(SocketAddr { ip: *addr, port: port })
};
match socket {
Some(socket) => return Ok(socket),
None => ()
}
}
Err(SocketError)
}
struct InnerPostgresConnection {
stream: BufferedStream<TcpStream>,
next_stmt_id: int,
notice_handler: ~PostgresNoticeHandler,
notifications: RingBuf<PostgresNotification>,
cancel_data: PostgresCancelData
}
impl Drop for InnerPostgresConnection {
@ -357,10 +426,10 @@ impl InnerPostgresConnection {
let port = match port {
Some(port) => FromStr::from_str(port).unwrap(),
None => 5432
None => DEFAULT_PORT
};
let stream = match InnerPostgresConnection::open_socket(host, port) {
let stream = match open_socket(host, port) {
Ok(stream) => stream,
Err(err) => return Err(err)
};
@ -370,6 +439,7 @@ impl InnerPostgresConnection {
next_stmt_id: 0,
notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler,
notifications: RingBuf::new(),
cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 }
};
args.push((~"client_encoding", ~"UTF8"));
@ -394,7 +464,10 @@ impl InnerPostgresConnection {
loop {
match conn.read_message() {
BackendKeyData {_} => (),
BackendKeyData { process_id, secret_key } => {
conn.cancel_data.process_id = process_id;
conn.cancel_data.secret_key = secret_key;
}
ReadyForQuery {_} => break,
ErrorResponse { fields } =>
return Err(DbError(PostgresDbError::new(fields))),
@ -405,29 +478,6 @@ impl InnerPostgresConnection {
Ok(conn)
}
fn open_socket(host: &str, port: Port)
-> Result<TcpStream, PostgresConnectError> {
let addrs = do io_error::cond.trap(|_| {}).inside {
net::get_host_addresses(host)
};
let addrs = match addrs {
Some(addrs) => addrs,
None => return Err(DnsError)
};
for addr in addrs.iter() {
let socket = do io_error::cond.trap(|_| {}).inside {
TcpStream::connect(SocketAddr { ip: *addr, port: port })
};
match socket {
Some(socket) => return Ok(socket),
None => ()
}
}
Err(SocketError)
}
fn write_messages(&mut self, messages: &[&FrontendMessage]) {
for &message in messages.iter() {
self.stream.write_message(message);
@ -695,6 +745,16 @@ impl PostgresConnection {
}
}
/// Returns information used to cancel pending queries.
///
/// Used with the `cancel_query` function. The object returned can be used
/// to cancel any query executed by the connection it was created from.
pub fn cancel_data(&self) -> PostgresCancelData {
do self.conn.with_mut_ref |conn| {
conn.cancel_data
}
}
fn quick_query(&self, query: &str) {
do self.conn.with_mut_ref |conn| {
conn.write_messages([&Query { query: query }]);

View File

@ -12,6 +12,7 @@ use std::vec;
use types::Oid;
pub static PROTOCOL_VERSION: i32 = 0x0003_0000;
pub static CANCEL_CODE: i32 = 80877102;
#[deriving(ToStr)]
pub enum BackendMessage {
@ -85,6 +86,11 @@ pub enum FrontendMessage<'self> {
values: &'self [Option<~[u8]>],
result_formats: &'self [i16]
},
CancelRequest {
code: i32,
process_id: i32,
secret_key: i32,
},
Close {
variant: u8,
name: &'self str
@ -166,6 +172,11 @@ impl<W: Writer> WriteMessage for W {
buf.write_be_i16_(*format);
}
}
CancelRequest { code, process_id, secret_key } => {
buf.write_be_i32_(code);
buf.write_be_i32_(process_id);
buf.write_be_i32_(secret_key);
}
Close { variant, name } => {
ident = Some('C');
buf.write_u8_(variant);

View File

@ -9,6 +9,7 @@ use extra::json;
use extra::uuid::Uuid;
use std::f32;
use std::f64;
use std::rt::io::timer;
use super::{PostgresNoticeHandler,
PostgresNotification,
@ -20,7 +21,7 @@ use super::{PostgresNoticeHandler,
PostgresDbError,
PostgresStatement,
ResultDescription};
use super::error::hack::{SyntaxError, InvalidPassword};
use super::error::hack::{SyntaxError, InvalidPassword, QueryCanceled};
use super::types::{ToSql, FromSql, PgInt4, PgVarchar};
use super::pool::PostgresConnectionPool;
@ -468,6 +469,24 @@ fn test_notification_iterator_some() {
assert!(it.next().is_none());
}
#[test]
// This test is pretty sad, but I don't think there's a better way :(
fn test_cancel_request() {
let conn = PostgresConnection::connect("postgres://postgres@localhost");
let cancel_data = conn.cancel_data();
do spawn {
timer::sleep(500);
assert!(super::cancel_query("postgres://postgres@localhost",
cancel_data).is_none());
}
match conn.try_update("SELECT pg_sleep(10)", []) {
Err(PostgresDbError { code: QueryCanceled, _ }) => {}
res => fail!("Unexpected result {:?}", res)
}
}
#[test]
fn test_plaintext_pass() {
PostgresConnection::connect("postgres://pass_user:password@localhost/postgres");