diff --git a/tokio-postgres/tests/test/main.rs b/tokio-postgres/tests/test/main.rs index b6be8662..1f72a7ea 100644 --- a/tokio-postgres/tests/test/main.rs +++ b/tokio-postgres/tests/test/main.rs @@ -686,25 +686,29 @@ fn transaction_builder_around_moved_client() { } #[test] -fn poll_idle() { - struct IdleFuture { - client: tokio_postgres::Client, - query: Option, +fn poll_idle_running() { + struct DelayStream(Delay); + + impl Stream for DelayStream { + type Item = Vec; + type Error = tokio_postgres::Error; + + fn poll(&mut self) -> Poll>, tokio_postgres::Error> { + try_ready!(self.0.poll().map_err(|e| panic!("{}", e))); + QUERY_DONE.store(true, Ordering::SeqCst); + Ok(Async::Ready(None)) + } } + struct IdleFuture(tokio_postgres::Client); + impl Future for IdleFuture { type Item = (); type Error = tokio_postgres::Error; fn poll(&mut self) -> Poll<(), tokio_postgres::Error> { - if let Some(_) = self.query.take() { - assert!(!self.client.poll_idle().unwrap().is_ready()); - return Ok(Async::NotReady); - } - - try_ready!(self.client.poll_idle()); + try_ready!(self.0.poll_idle()); assert!(QUERY_DONE.load(Ordering::SeqCst)); - Ok(Async::Ready(())) } } @@ -718,18 +722,59 @@ fn poll_idle() { let connection = connection.map_err(|e| panic!("{}", e)); runtime.handle().spawn(connection).unwrap(); - let stmt = runtime.block_on(client.prepare("SELECT 1")).unwrap(); + let execute = client.batch_execute("CREATE TEMPORARY TABLE foo (id INT)"); + runtime.block_on(execute).unwrap(); - let query = client - .query(&stmt, &[]) - .collect() - .map(|_| QUERY_DONE.store(true, Ordering::SeqCst)) - .map_err(|e| panic!("{}", e)); - runtime.spawn(query); + let prepare = client.prepare("COPY foo FROM STDIN"); + let stmt = runtime.block_on(prepare).unwrap(); + let copy_in = client.copy_in( + &stmt, + &[], + DelayStream(Delay::new(Instant::now() + Duration::from_millis(10))), + ); + let copy_in = copy_in.map(|_| ()).map_err(|e| panic!("{}", e)); + runtime.spawn(copy_in); + let future = IdleFuture(client); + runtime.block_on(future).unwrap(); +} + +#[test] +fn poll_idle_new() { + struct IdleFuture { + client: tokio_postgres::Client, + prepare: Option, + } + + impl Future for IdleFuture { + type Item = (); + type Error = tokio_postgres::Error; + + fn poll(&mut self) -> Poll<(), tokio_postgres::Error> { + match self.prepare.take() { + Some(_future) => { + assert!(!self.client.poll_idle().unwrap().is_ready()); + Ok(Async::NotReady) + } + None => { + assert!(self.client.poll_idle().unwrap().is_ready()); + Ok(Async::Ready(())) + } + } + } + } + + let _ = env_logger::try_init(); + let mut runtime = Runtime::new().unwrap(); + + let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap(); + let connection = connection.map_err(|e| panic!("{}", e)); + runtime.handle().spawn(connection).unwrap(); + + let prepare = client.prepare(""); let future = IdleFuture { - query: Some(client.prepare("")), client, + prepare: Some(prepare), }; runtime.block_on(future).unwrap(); }