2018-12-09 01:40:37 +00:00
|
|
|
#![warn(rust_2018_idioms)]
|
2018-06-19 02:34:25 +00:00
|
|
|
|
2018-07-07 17:11:16 +00:00
|
|
|
use futures::sync::mpsc;
|
2018-12-09 01:40:37 +00:00
|
|
|
use futures::{future, stream, try_ready};
|
|
|
|
use log::debug;
|
2018-07-14 21:59:37 +00:00
|
|
|
use std::error::Error;
|
2018-12-23 01:02:48 +00:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-06-28 05:37:43 +00:00
|
|
|
use std::time::{Duration, Instant};
|
2018-11-27 06:45:14 +00:00
|
|
|
use tokio::net::TcpStream;
|
2018-06-19 02:34:25 +00:00
|
|
|
use tokio::prelude::*;
|
|
|
|
use tokio::runtime::current_thread::Runtime;
|
2018-06-28 05:37:43 +00:00
|
|
|
use tokio::timer::Delay;
|
2018-06-22 02:38:42 +00:00
|
|
|
use tokio_postgres::error::SqlState;
|
2019-01-18 05:11:24 +00:00
|
|
|
use tokio_postgres::impls;
|
2018-07-05 04:02:08 +00:00
|
|
|
use tokio_postgres::types::{Kind, Type};
|
2019-01-13 22:53:19 +00:00
|
|
|
use tokio_postgres::{AsyncMessage, Client, Connection, NoTls, NoTlsStream};
|
2018-11-27 06:45:14 +00:00
|
|
|
|
2018-12-17 02:11:52 +00:00
|
|
|
mod parse;
|
2018-12-18 05:25:21 +00:00
|
|
|
#[cfg(feature = "runtime")]
|
|
|
|
mod runtime;
|
2018-12-10 00:18:21 +00:00
|
|
|
mod types;
|
|
|
|
|
2018-11-27 06:45:14 +00:00
|
|
|
fn connect(
|
2018-12-14 05:03:47 +00:00
|
|
|
s: &str,
|
2019-01-13 22:53:19 +00:00
|
|
|
) -> impl Future<Item = (Client, Connection<TcpStream, NoTlsStream>), Error = tokio_postgres::Error>
|
|
|
|
{
|
2018-12-30 05:00:58 +00:00
|
|
|
let builder = s.parse::<tokio_postgres::Config>().unwrap();
|
2018-11-27 06:45:14 +00:00
|
|
|
TcpStream::connect(&"127.0.0.1:5433".parse().unwrap())
|
|
|
|
.map_err(|e| panic!("{}", e))
|
2019-01-08 05:45:09 +00:00
|
|
|
.and_then(move |s| builder.connect_raw(s, NoTls))
|
2018-11-27 06:45:14 +00:00
|
|
|
}
|
2018-06-19 02:34:25 +00:00
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
fn smoke_test(s: &str) {
|
2018-06-22 02:38:42 +00:00
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect(s);
|
2018-06-22 02:38:42 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(handshake).unwrap();
|
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
let prepare = client.prepare("SELECT 1::INT4");
|
|
|
|
let statement = runtime.block_on(prepare).unwrap();
|
|
|
|
let select = client.query(&statement, &[]).collect().map(|rows| {
|
|
|
|
assert_eq!(rows.len(), 1);
|
|
|
|
assert_eq!(rows[0].get::<_, i32>(0), 1);
|
|
|
|
});
|
|
|
|
runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
drop(statement);
|
|
|
|
drop(client);
|
|
|
|
runtime.run().unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn plain_password_missing() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=pass_user dbname=postgres");
|
2018-08-15 17:27:34 +00:00
|
|
|
runtime.block_on(handshake).err().unwrap();
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn plain_password_wrong() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=pass_user password=foo dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
match runtime.block_on(handshake) {
|
|
|
|
Ok(_) => panic!("unexpected success"),
|
|
|
|
Err(ref e) if e.code() == Some(&SqlState::INVALID_PASSWORD) => {}
|
|
|
|
Err(e) => panic!("{}", e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn plain_password_ok() {
|
2018-12-14 05:03:47 +00:00
|
|
|
smoke_test("user=pass_user password=password dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn md5_password_missing() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=md5_user dbname=postgres");
|
2018-08-15 17:27:34 +00:00
|
|
|
runtime.block_on(handshake).err().unwrap();
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn md5_password_wrong() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=md5_user password=foo dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
match runtime.block_on(handshake) {
|
|
|
|
Ok(_) => panic!("unexpected success"),
|
|
|
|
Err(ref e) if e.code() == Some(&SqlState::INVALID_PASSWORD) => {}
|
|
|
|
Err(e) => panic!("{}", e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn md5_password_ok() {
|
2018-12-14 05:03:47 +00:00
|
|
|
smoke_test("user=md5_user password=password dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn scram_password_missing() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=scram_user dbname=postgres");
|
2018-08-15 17:27:34 +00:00
|
|
|
runtime.block_on(handshake).err().unwrap();
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn scram_password_wrong() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let handshake = connect("user=scram_user password=foo dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
match runtime.block_on(handshake) {
|
|
|
|
Ok(_) => panic!("unexpected success"),
|
|
|
|
Err(ref e) if e.code() == Some(&SqlState::INVALID_PASSWORD) => {}
|
|
|
|
Err(e) => panic!("{}", e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn scram_password_ok() {
|
2018-12-14 05:03:47 +00:00
|
|
|
smoke_test("user=scram_user password=password dbname=postgres");
|
2018-06-22 02:38:42 +00:00
|
|
|
}
|
|
|
|
|
2018-06-19 02:34:25 +00:00
|
|
|
#[test]
|
|
|
|
fn pipelined_prepare() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-06-19 02:34:25 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 23:02:45 +00:00
|
|
|
let prepare1 = client.prepare("SELECT $1::HSTORE[]");
|
|
|
|
let prepare2 = client.prepare("SELECT $1::HSTORE[]");
|
2018-06-19 02:34:25 +00:00
|
|
|
let prepare = prepare1.join(prepare2);
|
2018-07-08 23:02:45 +00:00
|
|
|
runtime.block_on(prepare).unwrap();
|
2018-06-19 02:34:25 +00:00
|
|
|
|
|
|
|
drop(client);
|
|
|
|
runtime.run().unwrap();
|
|
|
|
}
|
2018-06-21 00:06:11 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn insert_select() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-06-21 00:06:11 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-06-21 01:08:08 +00:00
|
|
|
runtime
|
2018-07-08 05:42:04 +00:00
|
|
|
.block_on(client.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT)"))
|
2018-06-21 01:08:08 +00:00
|
|
|
.unwrap();
|
2018-06-21 00:06:11 +00:00
|
|
|
|
|
|
|
let insert = client.prepare("INSERT INTO foo (name) VALUES ($1), ($2)");
|
|
|
|
let select = client.prepare("SELECT id, name FROM foo ORDER BY id");
|
|
|
|
let prepare = insert.join(select);
|
|
|
|
let (insert, select) = runtime.block_on(prepare).unwrap();
|
|
|
|
|
|
|
|
let insert = client
|
|
|
|
.execute(&insert, &[&"alice", &"bob"])
|
|
|
|
.map(|n| assert_eq!(n, 2));
|
|
|
|
let select = client.query(&select, &[]).collect().map(|rows| {
|
|
|
|
assert_eq!(rows.len(), 2);
|
|
|
|
assert_eq!(rows[0].get::<_, i32>(0), 1);
|
|
|
|
assert_eq!(rows[0].get::<_, &str>(1), "alice");
|
|
|
|
assert_eq!(rows[1].get::<_, i32>(0), 2);
|
|
|
|
assert_eq!(rows[1].get::<_, &str>(1), "bob");
|
|
|
|
});
|
|
|
|
let tests = insert.join(select);
|
|
|
|
runtime.block_on(tests).unwrap();
|
|
|
|
}
|
2018-06-28 05:37:43 +00:00
|
|
|
|
2018-08-16 04:00:15 +00:00
|
|
|
#[test]
|
|
|
|
fn query_portal() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-08-16 04:00:15 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT);
|
|
|
|
INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');
|
|
|
|
BEGIN;",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-08-16 04:00:15 +00:00
|
|
|
|
|
|
|
let statement = runtime
|
|
|
|
.block_on(client.prepare("SELECT id, name FROM foo ORDER BY id"))
|
|
|
|
.unwrap();
|
|
|
|
let portal = runtime.block_on(client.bind(&statement, &[])).unwrap();
|
|
|
|
|
|
|
|
let f1 = client.query_portal(&portal, 2).collect();
|
|
|
|
let f2 = client.query_portal(&portal, 2).collect();
|
|
|
|
let f3 = client.query_portal(&portal, 2).collect();
|
|
|
|
let (r1, r2, r3) = runtime.block_on(f1.join3(f2, f3)).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(r1.len(), 2);
|
|
|
|
assert_eq!(r1[0].get::<_, i32>(0), 1);
|
|
|
|
assert_eq!(r1[0].get::<_, &str>(1), "alice");
|
|
|
|
assert_eq!(r1[1].get::<_, i32>(0), 2);
|
|
|
|
assert_eq!(r1[1].get::<_, &str>(1), "bob");
|
|
|
|
|
|
|
|
assert_eq!(r2.len(), 1);
|
|
|
|
assert_eq!(r2[0].get::<_, i32>(0), 3);
|
|
|
|
assert_eq!(r2[0].get::<_, &str>(1), "charlie");
|
|
|
|
|
|
|
|
assert_eq!(r3.len(), 0);
|
|
|
|
}
|
|
|
|
|
2018-06-28 05:37:43 +00:00
|
|
|
#[test]
|
2019-01-07 02:03:51 +00:00
|
|
|
fn cancel_query_raw() {
|
2018-06-28 05:37:43 +00:00
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-06-28 05:37:43 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
let sleep = client
|
|
|
|
.batch_execute("SELECT pg_sleep(100)")
|
|
|
|
.then(|r| match r {
|
|
|
|
Ok(_) => panic!("unexpected success"),
|
|
|
|
Err(ref e) if e.code() == Some(&SqlState::QUERY_CANCELED) => Ok::<(), ()>(()),
|
|
|
|
Err(e) => panic!("unexpected error {}", e),
|
|
|
|
});
|
2018-06-28 05:37:43 +00:00
|
|
|
let cancel = Delay::new(Instant::now() + Duration::from_millis(100))
|
|
|
|
.then(|r| {
|
|
|
|
r.unwrap();
|
2018-11-27 06:45:14 +00:00
|
|
|
TcpStream::connect(&"127.0.0.1:5433".parse().unwrap())
|
2018-12-09 01:40:37 +00:00
|
|
|
})
|
|
|
|
.then(|r| {
|
2018-11-27 06:45:14 +00:00
|
|
|
let s = r.unwrap();
|
2019-01-07 02:03:51 +00:00
|
|
|
client.cancel_query_raw(s, NoTls)
|
2018-12-09 01:40:37 +00:00
|
|
|
})
|
|
|
|
.then(|r| {
|
2018-06-28 05:37:43 +00:00
|
|
|
r.unwrap();
|
|
|
|
Ok::<(), ()>(())
|
|
|
|
});
|
|
|
|
|
|
|
|
let ((), ()) = runtime.block_on(sleep.join(cancel)).unwrap();
|
|
|
|
}
|
2018-07-05 04:02:08 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_enum() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TYPE pg_temp.mood AS ENUM (
|
|
|
|
'sad',
|
|
|
|
'ok',
|
|
|
|
'happy'
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::mood");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!("mood", ty.name());
|
|
|
|
assert_eq!(
|
|
|
|
&Kind::Enum(vec![
|
|
|
|
"sad".to_string(),
|
|
|
|
"ok".to_string(),
|
|
|
|
"happy".to_string(),
|
|
|
|
]),
|
|
|
|
ty.kind()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_domain() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE DOMAIN pg_temp.session_id AS bytea CHECK(octet_length(VALUE) = 16)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::session_id");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!("session_id", ty.name());
|
|
|
|
assert_eq!(&Kind::Domain(Type::BYTEA), ty.kind());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_array() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::HSTORE[]");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!("_hstore", ty.name());
|
|
|
|
match *ty.kind() {
|
|
|
|
Kind::Array(ref ty) => {
|
|
|
|
assert_eq!("hstore", ty.name());
|
|
|
|
assert_eq!(&Kind::Simple, ty.kind());
|
|
|
|
}
|
|
|
|
_ => panic!("unexpected kind"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_composite() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TYPE pg_temp.inventory_item AS (
|
|
|
|
name TEXT,
|
|
|
|
supplier INTEGER,
|
|
|
|
price NUMERIC
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::inventory_item");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!(ty.name(), "inventory_item");
|
|
|
|
match *ty.kind() {
|
|
|
|
Kind::Composite(ref fields) => {
|
|
|
|
assert_eq!(fields[0].name(), "name");
|
|
|
|
assert_eq!(fields[0].type_(), &Type::TEXT);
|
|
|
|
assert_eq!(fields[1].name(), "supplier");
|
|
|
|
assert_eq!(fields[1].type_(), &Type::INT4);
|
|
|
|
assert_eq!(fields[2].name(), "price");
|
|
|
|
assert_eq!(fields[2].type_(), &Type::NUMERIC);
|
|
|
|
}
|
|
|
|
ref t => panic!("bad type {:?}", t),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_range() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TYPE pg_temp.floatrange AS RANGE (
|
|
|
|
subtype = float8,
|
|
|
|
subtype_diff = float8mi
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::floatrange");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!("floatrange", ty.name());
|
|
|
|
assert_eq!(&Kind::Range(Type::FLOAT8), ty.kind());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn custom_simple() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-05 04:02:08 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
let select = client.prepare("SELECT $1::HSTORE");
|
|
|
|
let select = runtime.block_on(select).unwrap();
|
|
|
|
|
|
|
|
let ty = &select.params()[0];
|
|
|
|
assert_eq!("hstore", ty.name());
|
|
|
|
assert_eq!(&Kind::Simple, ty.kind());
|
|
|
|
}
|
2018-07-07 17:11:16 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn notifications() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, mut connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-07 17:11:16 +00:00
|
|
|
|
|
|
|
let (tx, rx) = mpsc::unbounded();
|
|
|
|
let connection = future::poll_fn(move || {
|
|
|
|
while let Some(message) = try_ready!(connection.poll_message().map_err(|e| panic!("{}", e)))
|
|
|
|
{
|
|
|
|
if let AsyncMessage::Notification(notification) = message {
|
2019-01-08 05:10:15 +00:00
|
|
|
debug!("received {}", notification.payload());
|
2018-07-07 17:11:16 +00:00
|
|
|
tx.unbounded_send(notification).unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Async::Ready(()))
|
|
|
|
});
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute("LISTEN test_notifications"))
|
|
|
|
.unwrap();
|
2018-07-07 17:11:16 +00:00
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute("NOTIFY test_notifications, 'hello'"))
|
|
|
|
.unwrap();
|
2018-07-07 17:11:16 +00:00
|
|
|
|
2018-07-08 05:42:04 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute("NOTIFY test_notifications, 'world'"))
|
|
|
|
.unwrap();
|
2018-07-07 17:11:16 +00:00
|
|
|
|
|
|
|
drop(client);
|
|
|
|
runtime.run().unwrap();
|
|
|
|
|
|
|
|
let notifications = rx.collect().wait().unwrap();
|
|
|
|
assert_eq!(notifications.len(), 2);
|
2019-01-08 05:10:15 +00:00
|
|
|
assert_eq!(notifications[0].channel(), "test_notifications");
|
|
|
|
assert_eq!(notifications[0].payload(), "hello");
|
|
|
|
assert_eq!(notifications[1].channel(), "test_notifications");
|
|
|
|
assert_eq!(notifications[1].payload(), "world");
|
2018-07-07 17:11:16 +00:00
|
|
|
}
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
#[test]
|
2018-07-16 02:40:15 +00:00
|
|
|
fn transaction_commit() {
|
2018-07-14 21:59:37 +00:00
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (
|
|
|
|
id SERIAL,
|
|
|
|
name TEXT
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
let f = client.batch_execute("INSERT INTO foo (name) VALUES ('steven')");
|
2019-01-18 04:35:12 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.build_transaction().build(f))
|
|
|
|
.unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
let rows = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("SELECT name FROM foo")
|
|
|
|
.and_then(|s| client.query(&s, &[]).collect()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
assert_eq!(rows.len(), 1);
|
|
|
|
assert_eq!(rows[0].get::<_, &str>(0), "steven");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2018-07-16 02:40:15 +00:00
|
|
|
fn transaction_abort() {
|
2018-07-14 21:59:37 +00:00
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (
|
|
|
|
id SERIAL,
|
|
|
|
name TEXT
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
let f = client
|
|
|
|
.batch_execute("INSERT INTO foo (name) VALUES ('steven')")
|
2018-12-09 01:40:37 +00:00
|
|
|
.map_err(|e| Box::new(e) as Box<dyn Error>)
|
|
|
|
.and_then(|_| Err::<(), _>(Box::<dyn Error>::from("")));
|
2019-01-18 04:35:12 +00:00
|
|
|
runtime
|
|
|
|
.block_on(client.build_transaction().build(f))
|
|
|
|
.unwrap_err();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
let rows = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("SELECT name FROM foo")
|
|
|
|
.and_then(|s| client.query(&s, &[]).collect()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
|
|
|
|
assert_eq!(rows.len(), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn copy_in() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (
|
|
|
|
id INTEGER,
|
|
|
|
name TEXT
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
|
|
|
|
let stream = stream::iter_ok::<_, String>(vec![b"1\tjim\n".to_vec(), b"2\tjoe\n".to_vec()]);
|
|
|
|
let rows = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("COPY foo FROM STDIN")
|
|
|
|
.and_then(|s| client.copy_in(&s, &[], stream)),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
assert_eq!(rows, 2);
|
|
|
|
|
|
|
|
let rows = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("SELECT id, name FROM foo ORDER BY id")
|
|
|
|
.and_then(|s| client.query(&s, &[]).collect()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
|
|
|
|
assert_eq!(rows.len(), 2);
|
|
|
|
assert_eq!(rows[0].get::<_, i32>(0), 1);
|
|
|
|
assert_eq!(rows[0].get::<_, &str>(1), "jim");
|
|
|
|
assert_eq!(rows[1].get::<_, i32>(0), 2);
|
|
|
|
assert_eq!(rows[1].get::<_, &str>(1), "joe");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn copy_in_error() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (
|
|
|
|
id INTEGER,
|
|
|
|
name TEXT
|
|
|
|
)",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-08-11 21:32:17 +00:00
|
|
|
|
|
|
|
let stream = stream::iter_result(vec![Ok(b"1\tjim\n".to_vec()), Err("asdf")]);
|
|
|
|
let error = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("COPY foo FROM STDIN")
|
|
|
|
.and_then(|s| client.copy_in(&s, &[], stream)),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap_err();
|
2018-08-13 02:49:59 +00:00
|
|
|
assert!(error.to_string().contains("asdf"));
|
2018-08-11 21:32:17 +00:00
|
|
|
|
|
|
|
let rows = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("SELECT id, name FROM foo ORDER BY id")
|
|
|
|
.and_then(|s| client.query(&s, &[]).collect()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-07-14 21:59:37 +00:00
|
|
|
|
|
|
|
assert_eq!(rows.len(), 0);
|
|
|
|
}
|
2018-07-16 02:40:15 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn copy_out() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-07-16 02:40:15 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
|
|
|
runtime
|
|
|
|
.block_on(client.batch_execute(
|
|
|
|
"CREATE TEMPORARY TABLE foo (
|
|
|
|
id SERIAL,
|
|
|
|
name TEXT
|
|
|
|
);
|
|
|
|
INSERT INTO foo (name) VALUES ('jim'), ('joe');",
|
2018-12-09 01:40:37 +00:00
|
|
|
))
|
|
|
|
.unwrap();
|
2018-07-16 02:40:15 +00:00
|
|
|
|
|
|
|
let data = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("COPY foo TO STDOUT")
|
|
|
|
.and_then(|s| client.copy_out(&s, &[]).concat2()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-07-16 02:40:15 +00:00
|
|
|
assert_eq!(&data[..], b"1\tjim\n2\tjoe\n");
|
|
|
|
}
|
2018-11-06 18:14:32 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn transaction_builder_around_moved_client() {
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
2018-12-14 05:03:47 +00:00
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
2018-11-06 18:14:32 +00:00
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2019-01-18 04:35:12 +00:00
|
|
|
let transaction_builder = client.build_transaction();
|
|
|
|
let work = client
|
|
|
|
.batch_execute(
|
2018-12-02 04:01:46 +00:00
|
|
|
"CREATE TEMPORARY TABLE transaction_foo (
|
2019-01-18 04:35:12 +00:00
|
|
|
id SERIAL,
|
|
|
|
name TEXT
|
|
|
|
)",
|
|
|
|
)
|
|
|
|
.and_then(move |_| {
|
2018-11-06 18:14:32 +00:00
|
|
|
client
|
|
|
|
.prepare("INSERT INTO transaction_foo (name) VALUES ($1), ($2)")
|
|
|
|
.map(|statement| (client, statement))
|
|
|
|
})
|
2019-01-18 04:35:12 +00:00
|
|
|
.and_then(|(mut client, statement)| {
|
|
|
|
client
|
|
|
|
.query(&statement, &[&"jim", &"joe"])
|
|
|
|
.collect()
|
|
|
|
.map(|_res| client)
|
|
|
|
});
|
2018-11-06 18:14:32 +00:00
|
|
|
|
|
|
|
let transaction = transaction_builder.build(work);
|
|
|
|
let mut client = runtime.block_on(transaction).unwrap();
|
|
|
|
|
|
|
|
let data = runtime
|
|
|
|
.block_on(
|
|
|
|
client
|
|
|
|
.prepare("COPY transaction_foo TO STDOUT")
|
|
|
|
.and_then(|s| client.copy_out(&s, &[]).concat2()),
|
2018-12-09 01:40:37 +00:00
|
|
|
)
|
|
|
|
.unwrap();
|
2018-11-06 18:14:32 +00:00
|
|
|
assert_eq!(&data[..], b"1\tjim\n2\tjoe\n");
|
|
|
|
|
|
|
|
drop(client);
|
|
|
|
runtime.run().unwrap();
|
|
|
|
}
|
2018-12-23 01:02:48 +00:00
|
|
|
|
|
|
|
#[test]
|
2018-12-24 18:02:48 +00:00
|
|
|
fn poll_idle_running() {
|
|
|
|
struct DelayStream(Delay);
|
|
|
|
|
|
|
|
impl Stream for DelayStream {
|
|
|
|
type Item = Vec<u8>;
|
|
|
|
type Error = tokio_postgres::Error;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<Option<Vec<u8>>, tokio_postgres::Error> {
|
|
|
|
try_ready!(self.0.poll().map_err(|e| panic!("{}", e)));
|
|
|
|
QUERY_DONE.store(true, Ordering::SeqCst);
|
|
|
|
Ok(Async::Ready(None))
|
|
|
|
}
|
2018-12-23 01:02:48 +00:00
|
|
|
}
|
|
|
|
|
2018-12-24 18:02:48 +00:00
|
|
|
struct IdleFuture(tokio_postgres::Client);
|
|
|
|
|
2018-12-23 01:02:48 +00:00
|
|
|
impl Future for IdleFuture {
|
|
|
|
type Item = ();
|
|
|
|
type Error = tokio_postgres::Error;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<(), tokio_postgres::Error> {
|
2018-12-24 18:02:48 +00:00
|
|
|
try_ready!(self.0.poll_idle());
|
2018-12-23 01:02:48 +00:00
|
|
|
assert!(QUERY_DONE.load(Ordering::SeqCst));
|
|
|
|
Ok(Async::Ready(()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static QUERY_DONE: AtomicBool = AtomicBool::new(false);
|
|
|
|
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
|
|
|
|
2018-12-24 18:02:48 +00:00
|
|
|
let execute = client.batch_execute("CREATE TEMPORARY TABLE foo (id INT)");
|
|
|
|
runtime.block_on(execute).unwrap();
|
|
|
|
|
|
|
|
let prepare = client.prepare("COPY foo FROM STDIN");
|
|
|
|
let stmt = runtime.block_on(prepare).unwrap();
|
|
|
|
let copy_in = client.copy_in(
|
|
|
|
&stmt,
|
|
|
|
&[],
|
|
|
|
DelayStream(Delay::new(Instant::now() + Duration::from_millis(10))),
|
|
|
|
);
|
|
|
|
let copy_in = copy_in.map(|_| ()).map_err(|e| panic!("{}", e));
|
|
|
|
runtime.spawn(copy_in);
|
2018-12-23 01:02:48 +00:00
|
|
|
|
2018-12-24 18:02:48 +00:00
|
|
|
let future = IdleFuture(client);
|
|
|
|
runtime.block_on(future).unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn poll_idle_new() {
|
|
|
|
struct IdleFuture {
|
|
|
|
client: tokio_postgres::Client,
|
2019-01-18 05:11:24 +00:00
|
|
|
prepare: Option<impls::Prepare>,
|
2018-12-24 18:02:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Future for IdleFuture {
|
|
|
|
type Item = ();
|
|
|
|
type Error = tokio_postgres::Error;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<(), tokio_postgres::Error> {
|
|
|
|
match self.prepare.take() {
|
|
|
|
Some(_future) => {
|
|
|
|
assert!(!self.client.poll_idle().unwrap().is_ready());
|
|
|
|
Ok(Async::NotReady)
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
assert!(self.client.poll_idle().unwrap().is_ready());
|
|
|
|
Ok(Async::Ready(()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut runtime = Runtime::new().unwrap();
|
|
|
|
|
|
|
|
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
|
|
|
|
let connection = connection.map_err(|e| panic!("{}", e));
|
|
|
|
runtime.handle().spawn(connection).unwrap();
|
2018-12-23 01:02:48 +00:00
|
|
|
|
2018-12-24 18:02:48 +00:00
|
|
|
let prepare = client.prepare("");
|
2018-12-23 01:02:48 +00:00
|
|
|
let future = IdleFuture {
|
|
|
|
client,
|
2018-12-24 18:02:48 +00:00
|
|
|
prepare: Some(prepare),
|
2018-12-23 01:02:48 +00:00
|
|
|
};
|
|
|
|
runtime.block_on(future).unwrap();
|
|
|
|
}
|