Refactor notification API
This commit is contained in:
parent
008f14b459
commit
a2609f6c90
@ -3,8 +3,9 @@
|
|||||||
use debug_builders::DebugStruct;
|
use debug_builders::DebugStruct;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
use {Result, Connection, NotificationsNew};
|
use {desynchronized, Result, Connection, NotificationsNew};
|
||||||
use message::BackendMessage::NotificationResponse;
|
use message::BackendMessage::NotificationResponse;
|
||||||
|
use error::Error;
|
||||||
|
|
||||||
/// An asynchronous notification.
|
/// An asynchronous notification.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -25,11 +26,49 @@ pub struct Notifications<'conn> {
|
|||||||
impl<'a> fmt::Debug for Notifications<'a> {
|
impl<'a> fmt::Debug for Notifications<'a> {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
DebugStruct::new(fmt, "Notifications")
|
DebugStruct::new(fmt, "Notifications")
|
||||||
.field("pending", &self.conn.conn.borrow().notifications.len())
|
.field("pending", &self.len())
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'conn> Notifications<'conn> {
|
||||||
|
/// Returns the number of pending notifications.
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.conn.conn.borrow().notifications.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an iterator over pending notifications.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
///
|
||||||
|
/// This iterator may start returning `Some` after previously returning
|
||||||
|
/// `None` if more notifications are received.
|
||||||
|
pub fn iter<'a>(&'a self) -> Iter<'a> {
|
||||||
|
Iter {
|
||||||
|
conn: self.conn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an iterator over notifications, blocking until one is received
|
||||||
|
/// if none are pending.
|
||||||
|
///
|
||||||
|
/// The iterator will never return `None`.
|
||||||
|
pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> {
|
||||||
|
BlockingIter {
|
||||||
|
conn: self.conn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> {
|
||||||
|
type Item = Notification;
|
||||||
|
type IntoIter = Iter<'a>;
|
||||||
|
|
||||||
|
fn into_iter(self) -> Iter<'a> {
|
||||||
|
self.iter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
|
impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
|
||||||
fn new(conn: &'conn Connection) -> Notifications<'conn> {
|
fn new(conn: &'conn Connection) -> Notifications<'conn> {
|
||||||
Notifications {
|
Notifications {
|
||||||
@ -38,41 +77,47 @@ impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'conn> Iterator for Notifications<'conn> {
|
/// An iterator over pending notifications.
|
||||||
|
pub struct Iter<'a> {
|
||||||
|
conn: &'a Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Iterator for Iter<'a> {
|
||||||
type Item = Notification;
|
type Item = Notification;
|
||||||
|
|
||||||
/// Returns the oldest pending notification or `None` if there are none.
|
|
||||||
///
|
|
||||||
/// ## Note
|
|
||||||
///
|
|
||||||
/// `next` may return `Some` notification after returning `None` if a new
|
|
||||||
/// notification was received.
|
|
||||||
fn next(&mut self) -> Option<Notification> {
|
fn next(&mut self) -> Option<Notification> {
|
||||||
self.conn.conn.borrow_mut().notifications.pop_front()
|
self.conn.conn.borrow_mut().notifications.pop_front()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'conn> Notifications<'conn> {
|
/// An iterator over notifications which will block if none are pending.
|
||||||
/// Returns the oldest pending notification.
|
pub struct BlockingIter<'a> {
|
||||||
///
|
conn: &'a Connection,
|
||||||
/// If no notifications are pending, blocks until one arrives.
|
}
|
||||||
pub fn next_block(&mut self) -> Result<Notification> {
|
|
||||||
if let Some(notification) = self.next() {
|
impl<'a> Iterator for BlockingIter<'a> {
|
||||||
return Ok(notification);
|
type Item = Result<Notification>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Result<Notification>> {
|
||||||
|
let mut conn = self.conn.conn.borrow_mut();
|
||||||
|
if conn.is_desynchronized() {
|
||||||
|
return Some(Err(Error::IoError(desynchronized())));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut conn = self.conn.conn.borrow_mut();
|
if let Some(notification) = conn.notifications.pop_front() {
|
||||||
check_desync!(conn);
|
return Some(Ok(notification));
|
||||||
match try!(conn.read_message_with_notification()) {
|
}
|
||||||
NotificationResponse { pid, channel, payload } => {
|
|
||||||
Ok(Notification {
|
match conn.read_message_with_notification() {
|
||||||
|
Ok(NotificationResponse { pid, channel, payload }) => {
|
||||||
|
Some(Ok(Notification {
|
||||||
pid: pid,
|
pid: pid,
|
||||||
channel: channel,
|
channel: channel,
|
||||||
payload: payload
|
payload: payload
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
Err(err) => Some(Err(Error::IoError(err))),
|
||||||
_ => unreachable!()
|
_ => unreachable!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,7 +551,7 @@ fn test_custom_notice_handler() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_notification_iterator_none() {
|
fn test_notification_iterator_none() {
|
||||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||||
assert!(conn.notifications().next().is_none());
|
assert!(conn.notifications().iter().next().is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_notification(expected: Notification, actual: Notification) {
|
fn check_notification(expected: Notification, actual: Notification) {
|
||||||
@ -562,7 +562,8 @@ fn check_notification(expected: Notification, actual: Notification) {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_notification_iterator_some() {
|
fn test_notification_iterator_some() {
|
||||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||||
let mut it = conn.notifications();
|
let notifications = conn.notifications();
|
||||||
|
let mut it = notifications.iter();
|
||||||
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel", &[]));
|
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel", &[]));
|
||||||
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel2", &[]));
|
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel2", &[]));
|
||||||
or_panic!(conn.execute("NOTIFY test_notification_iterator_one_channel, 'hello'", &[]));
|
or_panic!(conn.execute("NOTIFY test_notification_iterator_one_channel, 'hello'", &[]));
|
||||||
@ -600,12 +601,12 @@ fn test_notifications_next_block() {
|
|||||||
or_panic!(conn.execute("NOTIFY test_notifications_next_block, 'foo'", &[]));
|
or_panic!(conn.execute("NOTIFY test_notifications_next_block, 'foo'", &[]));
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut notifications = conn.notifications();
|
let notifications = conn.notifications();
|
||||||
check_notification(Notification {
|
check_notification(Notification {
|
||||||
pid: 0,
|
pid: 0,
|
||||||
channel: "test_notifications_next_block".to_string(),
|
channel: "test_notifications_next_block".to_string(),
|
||||||
payload: "foo".to_string()
|
payload: "foo".to_string()
|
||||||
}, or_panic!(notifications.next_block()));
|
}, or_panic!(notifications.blocking_iter().next().unwrap()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Loading…
Reference in New Issue
Block a user