Add a blocking next method to Notifications

cc #19
This commit is contained in:
Steven Fackler 2014-11-25 18:30:28 -08:00
parent 7f89a83caf
commit 546967f68c
4 changed files with 83 additions and 19 deletions

View File

@ -24,7 +24,7 @@ default = ["uuid", "time"]
[dependencies]
phf = "*"
phf_mac = "*"
openssl = "*"
openssl = "0.1"
[dependencies.uuid]
optional = true

View File

@ -43,6 +43,16 @@ impl<S: Stream> Writer for MaybeSslStream<S> {
}
}
impl MaybeSslStream<InternalStream> {
#[allow(dead_code)]
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
match *self {
MaybeSslStream::Ssl(ref mut s) => s.get_inner().set_read_timeout(timeout_ms),
MaybeSslStream::Normal(ref mut s) => s.set_read_timeout(timeout_ms),
}
}
}
pub enum InternalStream {
TcpStream(tcp::TcpStream),
UnixStream(pipe::UnixStream),
@ -73,6 +83,15 @@ impl Writer for InternalStream {
}
}
impl InternalStream {
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
match *self {
TcpStream(ref mut s) => s.set_read_timeout(timeout_ms),
UnixStream(ref mut s) => s.set_read_timeout(timeout_ms),
}
}
}
fn open_socket(params: &ConnectParams) -> Result<InternalStream, ConnectError> {
let port = params.port.unwrap_or(DEFAULT_PORT);
match params.target {

View File

@ -238,6 +238,28 @@ impl<'conn> Iterator<Notification> for Notifications<'conn> {
}
}
impl<'conn> Notifications<'conn> {
/// Returns the oldest pending notification.
///
/// If no notifications are pending, blocks until one arrives.
pub fn next_block(&mut self) -> Result<Notification> {
if let Some(notification) = self.next() {
return Ok(notification);
}
match try!(self.conn.conn.borrow_mut().read_message_with_notification()) {
NotificationResponse { pid, channel, payload } => {
Ok(Notification {
pid: pid,
channel: channel,
payload: payload
})
}
_ => unreachable!()
}
}
}
/// Contains information necessary to cancel queries for a session
pub struct CancelData {
/// The process ID of the session
@ -376,7 +398,7 @@ impl InnerConnection {
Ok(try_desync!(self, self.stream.flush()))
}
fn read_message(&mut self) -> IoResult<BackendMessage> {
fn read_message_with_notification(&mut self) -> IoResult<BackendMessage> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message()) {
@ -385,6 +407,18 @@ impl InnerConnection {
self.notice_handler.handle(err);
}
}
ParameterStatus { parameter, value } => {
debug!("Parameter {} = {}", parameter, value)
}
val => return Ok(val)
}
}
}
fn read_message(&mut self) -> IoResult<BackendMessage> {
debug_assert!(!self.desynchronized);
loop {
match try!(self.read_message_with_notification()) {
NotificationResponse { pid, channel, payload } => {
self.notifications.push_back(Notification {
pid: pid,
@ -392,9 +426,6 @@ impl InnerConnection {
payload: payload
})
}
ParameterStatus { parameter, value } => {
debug!("Parameter {} = {}", parameter, value)
}
val => return Ok(val)
}
}

View File

@ -558,19 +558,14 @@ fn test_notification_iterator_none() {
assert!(conn.notifications().next().is_none());
}
fn check_notification(expected: Notification,
actual: Notification) {
assert_eq!(&expected.channel, &actual.channel);
assert_eq!(&expected.payload, &actual.payload);
}
#[test]
fn test_notification_iterator_some() {
fn check_notification(expected: Notification,
actual: Option<Notification>) {
match actual {
Some(Notification { channel, payload, .. }) => {
assert_eq!(&expected.channel, &channel);
assert_eq!(&expected.payload, &payload);
}
None => panic!("Unexpected result")
}
}
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
let mut it = conn.notifications();
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel", &[]));
@ -582,12 +577,12 @@ fn test_notification_iterator_some() {
pid: 0,
channel: "test_notification_iterator_one_channel".to_string(),
payload: "hello".to_string()
}, it.next());
}, it.next().unwrap());
check_notification(Notification {
pid: 0,
channel: "test_notification_iterator_one_channel2".to_string(),
payload: "world".to_string()
}, it.next());
}, it.next().unwrap());
assert!(it.next().is_none());
or_panic!(conn.execute("NOTIFY test_notification_iterator_one_channel, '!'", &[]));
@ -595,10 +590,29 @@ fn test_notification_iterator_some() {
pid: 0,
channel: "test_notification_iterator_one_channel".to_string(),
payload: "!".to_string()
}, it.next());
}, it.next().unwrap());
assert!(it.next().is_none());
}
#[test]
fn test_notifications_next_block() {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
or_panic!(conn.execute("LISTEN test_notifications_next_block", &[]));
spawn(proc() {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
timer::sleep(Duration::milliseconds(500));
or_panic!(conn.execute("NOTIFY test_notifications_next_block, 'foo'", &[]));
});
let mut notifications = conn.notifications();
check_notification(Notification {
pid: 0,
channel: "test_notifications_next_block".to_string(),
payload: "foo".to_string()
}, or_panic!(notifications.next_block()));
}
#[test]
// This test is pretty sad, but I don't think there's a better way :(
fn test_cancel_query() {