This commit is contained in:
Steven Fackler 2019-07-30 21:29:18 -07:00
parent f45884711f
commit beb509f3f3
2 changed files with 61 additions and 34 deletions

View File

@ -9,11 +9,11 @@ use futures::{SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
use pin_utils::pin_mut; use pin_utils::pin_mut;
use postgres_protocol::message::backend::Message; use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend; use postgres_protocol::message::frontend;
use postgres_protocol::message::frontend::CopyData;
use std::error; use std::error;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use postgres_protocol::message::frontend::CopyData;
enum CopyInMessage { enum CopyInMessage {
Message(FrontendMessage), Message(FrontendMessage),

View File

@ -1,10 +1,10 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
#![feature(async_await)] #![feature(async_await)]
use futures::stream;
use futures::{join, try_join, FutureExt, TryStreamExt}; use futures::{join, try_join, FutureExt, TryStreamExt};
use std::fmt::Write; use std::fmt::Write;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::stream;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::timer::Delay; use tokio::timer::Delay;
use tokio_postgres::error::SqlState; use tokio_postgres::error::SqlState;
@ -324,9 +324,9 @@ async fn transaction_commit() {
client client
.batch_execute( .batch_execute(
"CREATE TEMPORARY TABLE foo( "CREATE TEMPORARY TABLE foo(
id SERIAL, id SERIAL,
name TEXT name TEXT
)", )",
) )
.await .await
.unwrap(); .unwrap();
@ -356,9 +356,9 @@ async fn transaction_rollback() {
client client
.batch_execute( .batch_execute(
"CREATE TEMPORARY TABLE foo( "CREATE TEMPORARY TABLE foo(
id SERIAL, id SERIAL,
name TEXT name TEXT
)", )",
) )
.await .await
.unwrap(); .unwrap();
@ -387,9 +387,9 @@ async fn transaction_rollback_drop() {
client client
.batch_execute( .batch_execute(
"CREATE TEMPORARY TABLE foo( "CREATE TEMPORARY TABLE foo(
id SERIAL, id SERIAL,
name TEXT name TEXT
)", )",
) )
.await .await
.unwrap(); .unwrap();
@ -415,20 +415,34 @@ async fn transaction_rollback_drop() {
async fn copy_in() { async fn copy_in() {
let mut client = connect("user=postgres").await; let mut client = connect("user=postgres").await;
client.batch_execute( client
"CREATE TEMPORARY TABLE foo ( .batch_execute(
id INTEGER,\ "CREATE TEMPORARY TABLE foo (
name TEXT\ id INTEGER,
)" name TEXT
).await.unwrap(); )",
)
.await
.unwrap();
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap(); let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
let stream = stream::iter(vec![b"1\tjim\n".to_vec(), b"2\tjoe\n".to_vec()].into_iter().map(Ok::<_, String>)); let stream = stream::iter(
vec![b"1\tjim\n".to_vec(), b"2\tjoe\n".to_vec()]
.into_iter()
.map(Ok::<_, String>),
);
let rows = client.copy_in(&stmt, &[], stream).await.unwrap(); let rows = client.copy_in(&stmt, &[], stream).await.unwrap();
assert_eq!(rows, 2); assert_eq!(rows, 2);
let stmt = client.prepare("SELECT id, name FROM foo ORDER BY id").await.unwrap(); let stmt = client
let rows = client.query(&stmt, &[]).try_collect::<Vec<_>>().await.unwrap(); .prepare("SELECT id, name FROM foo ORDER BY id")
.await
.unwrap();
let rows = client
.query(&stmt, &[])
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(rows.len(), 2); assert_eq!(rows.len(), 2);
assert_eq!(rows[0].get::<_, i32>(0), 1); assert_eq!(rows[0].get::<_, i32>(0), 1);
@ -441,12 +455,15 @@ async fn copy_in() {
async fn copy_in_large() { async fn copy_in_large() {
let mut client = connect("user=postgres").await; let mut client = connect("user=postgres").await;
client.batch_execute( client
"CREATE TEMPORARY TABLE foo ( .batch_execute(
id INTEGER,\ "CREATE TEMPORARY TABLE foo (
name TEXT\ id INTEGER,
)" name TEXT
).await.unwrap(); )",
)
.await
.unwrap();
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap(); let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
@ -469,20 +486,30 @@ async fn copy_in_large() {
async fn copy_in_error() { async fn copy_in_error() {
let mut client = connect("user=postgres").await; let mut client = connect("user=postgres").await;
client.batch_execute( client
"CREATE TEMPORARY TABLE foo ( .batch_execute(
id INTEGER,\ "CREATE TEMPORARY TABLE foo (
name TEXT\ id INTEGER,
)" name TEXT
).await.unwrap(); )",
)
.await
.unwrap();
let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap(); let stmt = client.prepare("COPY foo FROM STDIN").await.unwrap();
let stream = stream::iter(vec![Ok(b"1\tjim\n".to_vec()), Err("asdf")]); let stream = stream::iter(vec![Ok(b"1\tjim\n".to_vec()), Err("asdf")]);
let error = client.copy_in(&stmt, &[], stream).await.unwrap_err(); let error = client.copy_in(&stmt, &[], stream).await.unwrap_err();
assert!(error.to_string().contains("asdf")); assert!(error.to_string().contains("asdf"));
let stmt = client.prepare("SELECT id, name FROM foo ORDER BY id").await.unwrap(); let stmt = client
let rows = client.query(&stmt, &[]).try_collect::<Vec<_>>().await.unwrap(); .prepare("SELECT id, name FROM foo ORDER BY id")
.await
.unwrap();
let rows = client
.query(&stmt, &[])
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(rows.len(), 0); assert_eq!(rows.len(), 0);
} }