Test notifications

This commit is contained in:
Steven Fackler 2019-07-31 21:19:56 -07:00
parent e521e3b0a5
commit 5dccb9988a
5 changed files with 51 additions and 57 deletions

View File

@ -1,5 +1,5 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{Config}; use crate::config::Config;
use crate::connect_tls::connect_tls; use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream; use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{ChannelBinding, TlsConnect}; use crate::tls::{ChannelBinding, TlsConnect};

View File

@ -292,7 +292,7 @@ where
} }
pub fn poll_message( pub fn poll_message(
mut self: Pin<&mut Self>, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<AsyncMessage, Error>>> { ) -> Poll<Option<Result<AsyncMessage, Error>>> {
let message = self.poll_read(cx)?; let message = self.poll_read(cx)?;

View File

@ -1,13 +1,13 @@
use crate::client::{InnerClient, Responses}; use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::Error; use crate::Error;
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, TryFutureExt, ready}; use futures::{ready, Stream, TryFutureExt};
use std::sync::Arc;
use crate::codec::FrontendMessage;
use postgres_protocol::message::backend::Message; use postgres_protocol::message::backend::Message;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::connection::RequestMessages;
pub fn copy_out( pub fn copy_out(
client: Arc<InnerClient>, client: Arc<InnerClient>,

View File

@ -126,7 +126,6 @@ mod cancel_query;
mod cancel_query_raw; mod cancel_query_raw;
mod client; mod client;
mod codec; mod codec;
mod copy_out;
pub mod config; pub mod config;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
mod connect; mod connect;
@ -136,6 +135,7 @@ mod connect_socket;
mod connect_tls; mod connect_tls;
mod connection; mod connection;
mod copy_in; mod copy_in;
mod copy_out;
pub mod error; pub mod error;
mod maybe_tls_stream; mod maybe_tls_stream;
mod prepare; mod prepare;

View File

@ -1,7 +1,8 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
#![feature(async_await)] #![feature(async_await)]
use futures::stream; use futures::channel::mpsc;
use futures::{future, stream, StreamExt};
use futures::{join, try_join, FutureExt, TryStreamExt}; use futures::{join, try_join, FutureExt, TryStreamExt};
use std::fmt::Write; use std::fmt::Write;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -10,7 +11,7 @@ use tokio::timer::Delay;
use tokio_postgres::error::SqlState; use tokio_postgres::error::SqlState;
use tokio_postgres::tls::{NoTls, NoTlsStream}; use tokio_postgres::tls::{NoTls, NoTlsStream};
use tokio_postgres::types::{Kind, Type}; use tokio_postgres::types::{Kind, Type};
use tokio_postgres::{Client, Config, Connection, Error, SimpleQueryMessage}; use tokio_postgres::{AsyncMessage, Client, Config, Connection, Error, SimpleQueryMessage};
mod parse; mod parse;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
@ -517,20 +518,57 @@ async fn copy_in_error() {
async fn copy_out() { async fn copy_out() {
let mut client = connect("user=postgres").await; let mut client = connect("user=postgres").await;
client.batch_execute( client
"CREATE TEMPORARY TABLE foo ( .batch_execute(
"CREATE TEMPORARY TABLE foo (
id SERIAL, id SERIAL,
name TEXT name TEXT
); );
INSERT INTO foo (name) VALUES ('jim'), ('joe');" INSERT INTO foo (name) VALUES ('jim'), ('joe');",
).await.unwrap(); )
.await
.unwrap();
let stmt = client.prepare("COPY foo TO STDOUT").await.unwrap(); let stmt = client.prepare("COPY foo TO STDOUT").await.unwrap();
let data = client.copy_out(&stmt, &[]).try_concat().await.unwrap(); let data = client.copy_out(&stmt, &[]).try_concat().await.unwrap();
assert_eq!(&data[..], b"1\tjim\n2\tjoe\n"); assert_eq!(&data[..], b"1\tjim\n2\tjoe\n");
} }
#[tokio::test]
async fn notifications() {
let (mut client, mut connection) = connect_raw("user=postgres").await.unwrap();
let (tx, rx) = mpsc::unbounded();
let stream = stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!(e));
let connection = stream.forward(tx).map(|r| r.unwrap());
tokio::spawn(connection);
client
.batch_execute(
"LISTEN test_notifications;
NOTIFY test_notifications, 'hello';
NOTIFY test_notifications, 'world';",
)
.await
.unwrap();
drop(client);
let notifications = rx
.filter_map(|m| match m {
AsyncMessage::Notification(n) => future::ready(Some(n)),
_ => future::ready(None),
})
.collect::<Vec<_>>()
.await;
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].channel(), "test_notifications");
assert_eq!(notifications[0].payload(), "hello");
assert_eq!(notifications[1].channel(), "test_notifications");
assert_eq!(notifications[1].payload(), "world");
}
/* /*
#[test] #[test]
fn query_portal() { fn query_portal() {
@ -576,50 +614,6 @@ fn query_portal() {
assert_eq!(r3.len(), 0); assert_eq!(r3.len(), 0);
} }
#[test]
fn notifications() {
let _ = env_logger::try_init();
let mut runtime = Runtime::new().unwrap();
let (mut client, mut connection) = runtime.block_on(connect("user=postgres")).unwrap();
let (tx, rx) = mpsc::unbounded();
let connection = future::poll_fn(move || {
while let Some(message) = try_ready!(connection.poll_message().map_err(|e| panic!("{}", e)))
{
if let AsyncMessage::Notification(notification) = message {
debug!("received {}", notification.payload());
tx.unbounded_send(notification).unwrap();
}
}
Ok(Async::Ready(()))
});
runtime.handle().spawn(connection).unwrap();
runtime
.block_on(
client
.simple_query(
"LISTEN test_notifications;
NOTIFY test_notifications, 'hello';
NOTIFY test_notifications, 'world';",
)
.for_each(|_| Ok(())),
)
.unwrap();
drop(client);
runtime.run().unwrap();
let notifications = rx.collect().wait().unwrap();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].channel(), "test_notifications");
assert_eq!(notifications[0].payload(), "hello");
assert_eq!(notifications[1].channel(), "test_notifications");
assert_eq!(notifications[1].payload(), "world");
}
#[test] #[test]
fn poll_idle_running() { fn poll_idle_running() {
struct DelayStream(Delay); struct DelayStream(Delay);