Add a notifications iterator that waits with a timeout

Closes #105
This commit is contained in:
Steven Fackler 2015-10-20 23:13:26 -07:00
parent c4becf6a39
commit 01153f784a
3 changed files with 97 additions and 2 deletions

View File

@ -68,6 +68,7 @@ use std::io::prelude::*;
use std::marker::Sync as StdSync; use std::marker::Sync as StdSync;
use std::mem; use std::mem;
use std::result; use std::result;
use std::time::Duration;
#[cfg(feature = "unix_socket")] #[cfg(feature = "unix_socket")]
use std::path::PathBuf; use std::path::PathBuf;
@ -509,6 +510,24 @@ impl InnerConnection {
} }
} }
fn read_message_with_notification_timeout(&mut self, timeout: Duration)
-> std::io::Result<Option<BackendMessage>> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message_timeout(timeout)) {
Some(NoticeResponse { fields }) => {
if let Ok(err) = DbError::new_raw(fields) {
self.notice_handler.handle_notice(err);
}
}
Some(ParameterStatus { parameter, value }) => {
self.parameters.insert(parameter, value);
}
val => return Ok(val)
}
}
}
fn read_message(&mut self) -> std_io::Result<BackendMessage> { fn read_message(&mut self) -> std_io::Result<BackendMessage> {
loop { loop {
match try!(self.read_message_with_notification()) { match try!(self.read_message_with_notification()) {

View File

@ -1,6 +1,7 @@
//! Asynchronous notifications. //! Asynchronous notifications.
use std::fmt; use std::fmt;
use std::time::Duration;
use {desynchronized, Result, Connection, NotificationsNew}; use {desynchronized, Result, Connection, NotificationsNew};
use message::BackendMessage::NotificationResponse; use message::BackendMessage::NotificationResponse;
@ -48,8 +49,8 @@ impl<'conn> Notifications<'conn> {
} }
} }
/// Returns an iterator over notifications, blocking until one is received /// Returns an iterator over notifications that blocks until one is
/// if none are pending. /// received if none are pending.
/// ///
/// The iterator will never return `None`. /// The iterator will never return `None`.
pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> { pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> {
@ -57,6 +58,20 @@ impl<'conn> Notifications<'conn> {
conn: self.conn, conn: self.conn,
} }
} }
/// Returns an iterator over notifications that blocks for a limited time
/// waiting to receive one if none are pending.
///
/// # Note
///
/// THis iterator may start returning `Some` after previously returning
/// `None` if more notifications are received.
pub fn timeout_iter<'a>(&'a self, timeout: Duration) -> TimeoutIter<'a> {
TimeoutIter {
conn: self.conn,
timeout: timeout,
}
}
} }
impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> { impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> {
@ -121,3 +136,39 @@ impl<'a> Iterator for BlockingIter<'a> {
} }
} }
} }
/// An iterator over notifications which will block for a period of time if
/// none are pending.
pub struct TimeoutIter<'a> {
conn: &'a Connection,
timeout: Duration,
}
impl<'a> Iterator for TimeoutIter<'a> {
type Item = Result<Notification>;
fn next(&mut self) -> Option<Result<Notification>> {
let mut conn = self.conn.conn.borrow_mut();
if let Some(notification) = conn.notifications.pop_front() {
return Some(Ok(notification));
}
if conn.is_desynchronized() {
return Some(Err(Error::IoError(desynchronized())));
}
match conn.read_message_with_notification_timeout(self.timeout) {
Ok(Some(NotificationResponse { pid, channel, payload })) => {
Some(Ok(Notification {
pid: pid,
channel: channel,
payload: payload
}))
}
Ok(None) => None,
Err(err) => Some(Err(Error::IoError(err))),
_ => unreachable!()
}
}
}

View File

@ -9,6 +9,7 @@ use openssl::ssl::{SslContext, SslMethod};
use std::thread; use std::thread;
use std::io; use std::io;
use std::io::prelude::*; use std::io::prelude::*;
use std::time::Duration;
use postgres::{HandleNotice, use postgres::{HandleNotice,
Connection, Connection,
@ -609,6 +610,30 @@ fn test_notifications_next_block() {
}, or_panic!(notifications.blocking_iter().next().unwrap())); }, or_panic!(notifications.blocking_iter().next().unwrap()));
} }
#[test]
fn test_notification_next_timeout() {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
or_panic!(conn.execute("LISTEN test_notifications_next_timeout", &[]));
let _t = thread::spawn(|| {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
thread::sleep_ms(500);
or_panic!(conn.execute("NOTIFY test_notifications_next_timeout, 'foo'", &[]));
thread::sleep_ms(1500);
or_panic!(conn.execute("NOTIFY test_notifications_next_timeout, 'foo'", &[]));
});
let notifications = conn.notifications();
let mut it = notifications.timeout_iter(Duration::from_secs(1));
check_notification(Notification {
pid: 0,
channel: "test_notifications_next_timeout".to_string(),
payload: "foo".to_string()
}, or_panic!(it.next().unwrap()));
assert!(it.next().is_none());
}
#[test] #[test]
// This test is pretty sad, but I don't think there's a better way :( // This test is pretty sad, but I don't think there's a better way :(
fn test_cancel_query() { fn test_cancel_query() {