From 008f14b45970a190dca04a64e46247127c9a641c Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Fri, 18 Sep 2015 20:55:01 -0700 Subject: [PATCH] Move notification stuff to its own module --- src/lib.rs | 70 ++++------------------------------------ src/notification.rs | 78 +++++++++++++++++++++++++++++++++++++++++++++ tests/test.rs | 2 +- 3 files changed, 86 insertions(+), 64 deletions(-) create mode 100644 src/notification.rs diff --git a/src/lib.rs b/src/lib.rs index 7096804d..4b492431 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ use message::BackendMessage::*; use message::FrontendMessage::*; use message::{FrontendMessage, BackendMessage, RowDescriptionEntry}; use message::{WriteMessage, ReadMessage}; +use notification::{Notifications, Notification}; use rows::{Rows, LazyRows}; use stmt::{Statement, Column}; use types::{IsNull, Kind, Type, SessionInfo, Oid, Other}; @@ -97,6 +98,7 @@ pub mod io; pub mod rows; pub mod stmt; pub mod types; +pub mod notification; const TYPEINFO_QUERY: &'static str = "t"; @@ -231,68 +233,6 @@ impl HandleNotice for LoggingNoticeHandler { } } -/// An asynchronous notification. -#[derive(Clone, Debug)] -pub struct Notification { - /// The process ID of the notifying backend process. - pub pid: u32, - /// The name of the channel that the notify has been raised on. - pub channel: String, - /// The "payload" string passed from the notifying process. - pub payload: String, -} - -/// An iterator over asynchronous notifications. -pub struct Notifications<'conn> { - conn: &'conn Connection -} - -impl<'a> fmt::Debug for Notifications<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - DebugStruct::new(fmt, "Notifications") - .field("pending", &self.conn.conn.borrow().notifications.len()) - .finish() - } -} - -impl<'conn> Iterator for Notifications<'conn> { - type Item = Notification; - - /// Returns the oldest pending notification or `None` if there are none. - /// - /// ## Note - /// - /// `next` may return `Some` notification after returning `None` if a new - /// notification was received. - fn next(&mut self) -> Option { - self.conn.conn.borrow_mut().notifications.pop_front() - } -} - -impl<'conn> Notifications<'conn> { - /// Returns the oldest pending notification. - /// - /// If no notifications are pending, blocks until one arrives. - pub fn next_block(&mut self) -> Result { - if let Some(notification) = self.next() { - return Ok(notification); - } - - let mut conn = self.conn.conn.borrow_mut(); - check_desync!(conn); - match try!(conn.read_message_with_notification()) { - NotificationResponse { pid, channel, payload } => { - Ok(Notification { - pid: pid, - channel: channel, - payload: payload - }) - } - _ => unreachable!() - } - } -} - /// Contains information necessary to cancel queries for a session. #[derive(Copy, Clone, Debug)] pub struct CancelData { @@ -947,7 +887,7 @@ impl Connection { /// /// Use the `LISTEN` command to register this connection for notifications. pub fn notifications<'a>(&'a self) -> Notifications<'a> { - Notifications { conn: self } + Notifications::new(self) } /// Creates a new prepared statement. @@ -1444,3 +1384,7 @@ trait StatementInternals<'conn> { trait ColumnNew { fn new(name: String, type_: Type) -> Column; } + +trait NotificationsNew<'conn> { + fn new(conn: &'conn Connection) -> Notifications<'conn>; +} diff --git a/src/notification.rs b/src/notification.rs new file mode 100644 index 00000000..7db2751e --- /dev/null +++ b/src/notification.rs @@ -0,0 +1,78 @@ +//! Asynchronous notifications. + +use debug_builders::DebugStruct; +use std::fmt; + +use {Result, Connection, NotificationsNew}; +use message::BackendMessage::NotificationResponse; + +/// An asynchronous notification. +#[derive(Clone, Debug)] +pub struct Notification { + /// The process ID of the notifying backend process. + pub pid: u32, + /// The name of the channel that the notify has been raised on. + pub channel: String, + /// The "payload" string passed from the notifying process. + pub payload: String, +} + +/// An iterator over asynchronous notifications. +pub struct Notifications<'conn> { + conn: &'conn Connection +} + +impl<'a> fmt::Debug for Notifications<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + DebugStruct::new(fmt, "Notifications") + .field("pending", &self.conn.conn.borrow().notifications.len()) + .finish() + } +} + +impl<'conn> NotificationsNew<'conn> for Notifications<'conn> { + fn new(conn: &'conn Connection) -> Notifications<'conn> { + Notifications { + conn: conn, + } + } +} + +impl<'conn> Iterator for Notifications<'conn> { + type Item = Notification; + + /// Returns the oldest pending notification or `None` if there are none. + /// + /// ## Note + /// + /// `next` may return `Some` notification after returning `None` if a new + /// notification was received. + fn next(&mut self) -> Option { + self.conn.conn.borrow_mut().notifications.pop_front() + } +} + +impl<'conn> Notifications<'conn> { + /// Returns the oldest pending notification. + /// + /// If no notifications are pending, blocks until one arrives. + pub fn next_block(&mut self) -> Result { + if let Some(notification) = self.next() { + return Ok(notification); + } + + let mut conn = self.conn.conn.borrow_mut(); + check_desync!(conn); + match try!(conn.read_message_with_notification()) { + NotificationResponse { pid, channel, payload } => { + Ok(Notification { + pid: pid, + channel: channel, + payload: payload + }) + } + _ => unreachable!() + } + } +} + diff --git a/tests/test.rs b/tests/test.rs index c3a83ce0..f1411e75 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -11,7 +11,6 @@ use std::io; use std::io::prelude::*; use postgres::{HandleNotice, - Notification, Connection, GenericConnection, SslMode, @@ -27,6 +26,7 @@ use postgres::error::SqlState::{SyntaxError, CardinalityViolation}; use postgres::error::ErrorPosition::Normal; use postgres::rows::RowIndex; +use postgres::notification::Notification; macro_rules! or_panic { ($e:expr) => (