Implement asynchronous notification support

A decent first pass on #19, but there really needs to be better support
for polling for notifications than making a query and calling next()
every once in a while.
This commit is contained in:
Steven Fackler 2013-10-14 22:41:03 -07:00
parent e2ebe60de4
commit fef8a35795
3 changed files with 108 additions and 2 deletions

View File

@ -100,6 +100,7 @@ use message::{BackendMessage,
ErrorResponse,
NoData,
NoticeResponse,
NotificationResponse,
ParameterDescription,
ParameterStatus,
ParseComplete,
@ -142,6 +143,36 @@ impl PostgresNoticeHandler for DefaultNoticeHandler {
}
}
/// An asynchronous notification
pub struct PostgresNotification {
/// The process ID of the notifying backend process
pid: i32,
/// The name of the channel that the notify has been raised on
channel: ~str,
/// The "payload" string passed from the notifying process
payload: ~str,
}
/// An iterator over asynchronous notifications
pub struct PostgresNotificationIterator<'self> {
priv conn: &'self PostgresConnection
}
impl<'self> Iterator<PostgresNotification> for
PostgresNotificationIterator<'self> {
/// Returns the oldest pending notification or `None` if there are none.
///
/// # Note
///
/// `next` may return `Some` notification after returning `None` if a new
/// notification was received.
fn next(&mut self) -> Option<PostgresNotification> {
do self.conn.conn.with_mut_ref |conn| {
conn.notifications.pop_front()
}
}
}
/// Reasons a new Postgres connection could fail
#[deriving(ToStr)]
pub enum PostgresConnectError {
@ -288,7 +319,8 @@ impl PostgresDbError {
struct InnerPostgresConnection {
stream: BufferedStream<TcpStream>,
next_stmt_id: int,
notice_handler: ~PostgresNoticeHandler
notice_handler: ~PostgresNoticeHandler,
notifications: RingBuf<PostgresNotification>,
}
impl Drop for InnerPostgresConnection {
@ -332,7 +364,8 @@ impl InnerPostgresConnection {
let mut conn = InnerPostgresConnection {
stream: BufferedStream::new(stream),
next_stmt_id: 0,
notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler
notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler,
notifications: RingBuf::new(),
};
args.push((~"client_encoding", ~"UTF8"));
@ -403,6 +436,12 @@ impl InnerPostgresConnection {
match self.stream.read_message() {
NoticeResponse { fields } =>
self.notice_handler.handle(PostgresDbError::new(fields)),
NotificationResponse { pid, channel, payload } =>
self.notifications.push_back(PostgresNotification {
pid: pid,
channel: channel,
payload: payload
}),
ParameterStatus { parameter, value } =>
debug!("Parameter %s = %s", parameter, value),
msg => return msg
@ -573,6 +612,15 @@ impl PostgresConnection {
handler
}
/// Returns an iterator over asynchronous notification messages.
///
/// Use the `LISTEN` command to register this connection for notifications.
pub fn notifications<'a>(&'a self) -> PostgresNotificationIterator<'a> {
PostgresNotificationIterator {
conn: self
}
}
/// Attempts to create a new prepared statement.
///
/// A statement may contain parameters, specified by `$n` where `n` is the

View File

@ -44,6 +44,11 @@ pub enum BackendMessage {
NoticeResponse {
fields: ~[(u8, ~str)]
},
NotificationResponse {
pid: i32,
channel: ~str,
payload: ~str
},
ParameterDescription {
types: ~[Oid]
},
@ -258,6 +263,11 @@ impl<R: Reader> ReadMessage for R {
'1' => ParseComplete,
'2' => BindComplete,
'3' => CloseComplete,
'A' => NotificationResponse {
pid: buf.read_be_i32_(),
channel: buf.read_string(),
payload: buf.read_string()
},
'C' => CommandComplete { tag: buf.read_string() },
'D' => read_data_row(&mut buf),
'E' => ErrorResponse { fields: read_fields(&mut buf) },

View File

@ -12,6 +12,7 @@ use std::f32;
use std::f64;
use postgres::{PostgresNoticeHandler,
PostgresNotification,
DbError,
DnsError,
MissingPassword,
@ -422,6 +423,53 @@ fn test_custom_notice_handler() {
assert_eq!(unsafe { count }, 1);
}
#[test]
fn test_notification_iterator_none() {
let conn = PostgresConnection::connect("postgres://postgres@localhost");
assert!(conn.notifications().next().is_none());
}
#[test]
fn test_notification_iterator_some() {
fn check_notification(expected: PostgresNotification,
actual: Option<PostgresNotification>) {
match actual {
Some(PostgresNotification { channel, payload, _ }) => {
assert_eq!(&expected.channel, &channel);
assert_eq!(&expected.payload, &payload);
}
x => fail2!("Expected {:?} but got {:?}", expected, x)
}
}
let conn = PostgresConnection::connect("postgres://postgres@localhost");
let mut it = conn.notifications();
conn.update("LISTEN test_notification_iterator_one_channel", []);
conn.update("LISTEN test_notification_iterator_one_channel2", []);
conn.update("NOTIFY test_notification_iterator_one_channel, 'hello'", []);
conn.update("NOTIFY test_notification_iterator_one_channel2, 'world'", []);
check_notification(PostgresNotification {
pid: 0,
channel: ~"test_notification_iterator_one_channel",
payload: ~"hello"
}, it.next());
check_notification(PostgresNotification {
pid: 0,
channel: ~"test_notification_iterator_one_channel2",
payload: ~"world"
}, it.next());
assert!(it.next().is_none());
conn.update("NOTIFY test_notification_iterator_one_channel, '!'", []);
check_notification(PostgresNotification {
pid: 0,
channel: ~"test_notification_iterator_one_channel",
payload: ~"!"
}, it.next());
assert!(it.next().is_none());
}
#[test]
fn test_plaintext_pass() {
PostgresConnection::connect("postgres://pass_user:password@localhost/postgres");