From fef8a3579533a2b8fdc10bddf75835f9808d7476 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Mon, 14 Oct 2013 22:41:03 -0700 Subject: [PATCH] Implement asynchronous notification support A decent first pass on #19, but there really needs to be better support for polling for notifications than making a query and calling next() every once in a while. --- src/postgres/lib.rs | 52 +++++++++++++++++++++++++++++++++++++++-- src/postgres/message.rs | 10 ++++++++ src/postgres/test.rs | 48 +++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/src/postgres/lib.rs b/src/postgres/lib.rs index 45e7c2bf..17ab424b 100644 --- a/src/postgres/lib.rs +++ b/src/postgres/lib.rs @@ -100,6 +100,7 @@ use message::{BackendMessage, ErrorResponse, NoData, NoticeResponse, + NotificationResponse, ParameterDescription, ParameterStatus, ParseComplete, @@ -142,6 +143,36 @@ impl PostgresNoticeHandler for DefaultNoticeHandler { } } +/// An asynchronous notification +pub struct PostgresNotification { + /// The process ID of the notifying backend process + pid: i32, + /// The name of the channel that the notify has been raised on + channel: ~str, + /// The "payload" string passed from the notifying process + payload: ~str, +} + +/// An iterator over asynchronous notifications +pub struct PostgresNotificationIterator<'self> { + priv conn: &'self PostgresConnection +} + +impl<'self> Iterator for + PostgresNotificationIterator<'self> { + /// Returns the oldest pending notification or `None` if there are none. + /// + /// # Note + /// + /// `next` may return `Some` notification after returning `None` if a new + /// notification was received. + fn next(&mut self) -> Option { + do self.conn.conn.with_mut_ref |conn| { + conn.notifications.pop_front() + } + } +} + /// Reasons a new Postgres connection could fail #[deriving(ToStr)] pub enum PostgresConnectError { @@ -288,7 +319,8 @@ impl PostgresDbError { struct InnerPostgresConnection { stream: BufferedStream, next_stmt_id: int, - notice_handler: ~PostgresNoticeHandler + notice_handler: ~PostgresNoticeHandler, + notifications: RingBuf, } impl Drop for InnerPostgresConnection { @@ -332,7 +364,8 @@ impl InnerPostgresConnection { let mut conn = InnerPostgresConnection { stream: BufferedStream::new(stream), next_stmt_id: 0, - notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler + notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler, + notifications: RingBuf::new(), }; args.push((~"client_encoding", ~"UTF8")); @@ -403,6 +436,12 @@ impl InnerPostgresConnection { match self.stream.read_message() { NoticeResponse { fields } => self.notice_handler.handle(PostgresDbError::new(fields)), + NotificationResponse { pid, channel, payload } => + self.notifications.push_back(PostgresNotification { + pid: pid, + channel: channel, + payload: payload + }), ParameterStatus { parameter, value } => debug!("Parameter %s = %s", parameter, value), msg => return msg @@ -573,6 +612,15 @@ impl PostgresConnection { handler } + /// Returns an iterator over asynchronous notification messages. + /// + /// Use the `LISTEN` command to register this connection for notifications. + pub fn notifications<'a>(&'a self) -> PostgresNotificationIterator<'a> { + PostgresNotificationIterator { + conn: self + } + } + /// Attempts to create a new prepared statement. /// /// A statement may contain parameters, specified by `$n` where `n` is the diff --git a/src/postgres/message.rs b/src/postgres/message.rs index ada07a70..61994f49 100644 --- a/src/postgres/message.rs +++ b/src/postgres/message.rs @@ -44,6 +44,11 @@ pub enum BackendMessage { NoticeResponse { fields: ~[(u8, ~str)] }, + NotificationResponse { + pid: i32, + channel: ~str, + payload: ~str + }, ParameterDescription { types: ~[Oid] }, @@ -258,6 +263,11 @@ impl ReadMessage for R { '1' => ParseComplete, '2' => BindComplete, '3' => CloseComplete, + 'A' => NotificationResponse { + pid: buf.read_be_i32_(), + channel: buf.read_string(), + payload: buf.read_string() + }, 'C' => CommandComplete { tag: buf.read_string() }, 'D' => read_data_row(&mut buf), 'E' => ErrorResponse { fields: read_fields(&mut buf) }, diff --git a/src/postgres/test.rs b/src/postgres/test.rs index f67d5286..7df8d18f 100644 --- a/src/postgres/test.rs +++ b/src/postgres/test.rs @@ -12,6 +12,7 @@ use std::f32; use std::f64; use postgres::{PostgresNoticeHandler, + PostgresNotification, DbError, DnsError, MissingPassword, @@ -422,6 +423,53 @@ fn test_custom_notice_handler() { assert_eq!(unsafe { count }, 1); } +#[test] +fn test_notification_iterator_none() { + let conn = PostgresConnection::connect("postgres://postgres@localhost"); + assert!(conn.notifications().next().is_none()); +} + +#[test] +fn test_notification_iterator_some() { + fn check_notification(expected: PostgresNotification, + actual: Option) { + match actual { + Some(PostgresNotification { channel, payload, _ }) => { + assert_eq!(&expected.channel, &channel); + assert_eq!(&expected.payload, &payload); + } + x => fail2!("Expected {:?} but got {:?}", expected, x) + } + } + + let conn = PostgresConnection::connect("postgres://postgres@localhost"); + let mut it = conn.notifications(); + conn.update("LISTEN test_notification_iterator_one_channel", []); + conn.update("LISTEN test_notification_iterator_one_channel2", []); + conn.update("NOTIFY test_notification_iterator_one_channel, 'hello'", []); + conn.update("NOTIFY test_notification_iterator_one_channel2, 'world'", []); + + check_notification(PostgresNotification { + pid: 0, + channel: ~"test_notification_iterator_one_channel", + payload: ~"hello" + }, it.next()); + check_notification(PostgresNotification { + pid: 0, + channel: ~"test_notification_iterator_one_channel2", + payload: ~"world" + }, it.next()); + assert!(it.next().is_none()); + + conn.update("NOTIFY test_notification_iterator_one_channel, '!'", []); + check_notification(PostgresNotification { + pid: 0, + channel: ~"test_notification_iterator_one_channel", + payload: ~"!" + }, it.next()); + assert!(it.next().is_none()); +} + #[test] fn test_plaintext_pass() { PostgresConnection::connect("postgres://pass_user:password@localhost/postgres");