Fix pipelined preparation
We can end up double-preparing the typeinfo queries if we're pipelining preparation, so pick a unique name for them.
This commit is contained in:
parent
53657b828a
commit
bf0633681b
@ -44,6 +44,10 @@ pub mod tls;
|
|||||||
|
|
||||||
static NEXT_STATEMENT_ID: AtomicUsize = AtomicUsize::new(0);
|
static NEXT_STATEMENT_ID: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
fn next_statement() -> String {
|
||||||
|
format!("s{}", NEXT_STATEMENT_ID.fetch_add(1, Ordering::SeqCst))
|
||||||
|
}
|
||||||
|
|
||||||
fn bad_response() -> Error {
|
fn bad_response() -> Error {
|
||||||
Error::from(io::Error::new(
|
Error::from(io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::InvalidInput,
|
||||||
@ -80,8 +84,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn prepare_typed(&mut self, query: &str, param_types: &[Type]) -> Prepare {
|
pub fn prepare_typed(&mut self, query: &str, param_types: &[Type]) -> Prepare {
|
||||||
let name = format!("s{}", NEXT_STATEMENT_ID.fetch_add(1, Ordering::SeqCst));
|
Prepare(self.0.prepare(next_statement(), query, param_types))
|
||||||
Prepare(self.0.prepare(name, query, param_types))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> Execute {
|
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> Execute {
|
||||||
|
@ -2,7 +2,6 @@ use futures::stream::{self, Stream};
|
|||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use state_machine_future::RentToOwn;
|
use state_machine_future::RentToOwn;
|
||||||
|
|
||||||
use bad_response;
|
|
||||||
use error::{Error, SqlState};
|
use error::{Error, SqlState};
|
||||||
use proto::client::Client;
|
use proto::client::Client;
|
||||||
use proto::prepare::PrepareFuture;
|
use proto::prepare::PrepareFuture;
|
||||||
@ -10,8 +9,7 @@ use proto::query::QueryStream;
|
|||||||
use proto::typeinfo_composite::TypeinfoCompositeFuture;
|
use proto::typeinfo_composite::TypeinfoCompositeFuture;
|
||||||
use proto::typeinfo_enum::TypeinfoEnumFuture;
|
use proto::typeinfo_enum::TypeinfoEnumFuture;
|
||||||
use types::{Kind, Oid, Type};
|
use types::{Kind, Oid, Type};
|
||||||
|
use {bad_response, next_statement};
|
||||||
const TYPEINFO_NAME: &'static str = "_rust_typeinfo";
|
|
||||||
|
|
||||||
const TYPEINFO_QUERY: &'static str = "
|
const TYPEINFO_QUERY: &'static str = "
|
||||||
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
|
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
|
||||||
@ -125,11 +123,7 @@ impl PollTypeinfo for Typeinfo {
|
|||||||
client: state.client,
|
client: state.client,
|
||||||
}),
|
}),
|
||||||
None => transition!(PreparingTypeinfo {
|
None => transition!(PreparingTypeinfo {
|
||||||
future: Box::new(state.client.prepare(
|
future: Box::new(state.client.prepare(next_statement(), TYPEINFO_QUERY, &[])),
|
||||||
TYPEINFO_NAME.to_string(),
|
|
||||||
TYPEINFO_QUERY,
|
|
||||||
&[]
|
|
||||||
)),
|
|
||||||
oid: state.oid,
|
oid: state.oid,
|
||||||
client: state.client,
|
client: state.client,
|
||||||
}),
|
}),
|
||||||
@ -147,7 +141,7 @@ impl PollTypeinfo for Typeinfo {
|
|||||||
|
|
||||||
transition!(PreparingTypeinfoFallback {
|
transition!(PreparingTypeinfoFallback {
|
||||||
future: Box::new(state.client.prepare(
|
future: Box::new(state.client.prepare(
|
||||||
TYPEINFO_NAME.to_string(),
|
next_statement(),
|
||||||
TYPEINFO_FALLBACK_QUERY,
|
TYPEINFO_FALLBACK_QUERY,
|
||||||
&[]
|
&[]
|
||||||
)),
|
)),
|
||||||
|
@ -4,15 +4,13 @@ use state_machine_future::RentToOwn;
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
use std::vec;
|
use std::vec;
|
||||||
|
|
||||||
use bad_response;
|
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use proto::client::Client;
|
use proto::client::Client;
|
||||||
use proto::prepare::PrepareFuture;
|
use proto::prepare::PrepareFuture;
|
||||||
use proto::query::QueryStream;
|
use proto::query::QueryStream;
|
||||||
use proto::typeinfo::TypeinfoFuture;
|
use proto::typeinfo::TypeinfoFuture;
|
||||||
use types::{Field, Oid};
|
use types::{Field, Oid};
|
||||||
|
use {bad_response, next_statement};
|
||||||
const TYPEINFO_COMPOSITE_NAME: &'static str = "_rust_typeinfo_composite";
|
|
||||||
|
|
||||||
const TYPEINFO_COMPOSITE_QUERY: &'static str = "
|
const TYPEINFO_COMPOSITE_QUERY: &'static str = "
|
||||||
SELECT attname, atttypid
|
SELECT attname, atttypid
|
||||||
@ -65,7 +63,7 @@ impl PollTypeinfoComposite for TypeinfoComposite {
|
|||||||
}),
|
}),
|
||||||
None => transition!(PreparingTypeinfoComposite {
|
None => transition!(PreparingTypeinfoComposite {
|
||||||
future: Box::new(state.client.prepare(
|
future: Box::new(state.client.prepare(
|
||||||
TYPEINFO_COMPOSITE_NAME.to_string(),
|
next_statement(),
|
||||||
TYPEINFO_COMPOSITE_QUERY,
|
TYPEINFO_COMPOSITE_QUERY,
|
||||||
&[]
|
&[]
|
||||||
)),
|
)),
|
||||||
|
@ -2,14 +2,12 @@ use futures::stream::{self, Stream};
|
|||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use state_machine_future::RentToOwn;
|
use state_machine_future::RentToOwn;
|
||||||
|
|
||||||
use bad_response;
|
|
||||||
use error::{Error, SqlState};
|
use error::{Error, SqlState};
|
||||||
use proto::client::Client;
|
use proto::client::Client;
|
||||||
use proto::prepare::PrepareFuture;
|
use proto::prepare::PrepareFuture;
|
||||||
use proto::query::QueryStream;
|
use proto::query::QueryStream;
|
||||||
use types::Oid;
|
use types::Oid;
|
||||||
|
use {bad_response, next_statement};
|
||||||
const TYPEINFO_ENUM_NAME: &'static str = "_rust_typeinfo_enum";
|
|
||||||
|
|
||||||
const TYPEINFO_ENUM_QUERY: &'static str = "
|
const TYPEINFO_ENUM_QUERY: &'static str = "
|
||||||
SELECT enumlabel
|
SELECT enumlabel
|
||||||
@ -66,11 +64,11 @@ impl PollTypeinfoEnum for TypeinfoEnum {
|
|||||||
client: state.client,
|
client: state.client,
|
||||||
}),
|
}),
|
||||||
None => transition!(PreparingTypeinfoEnum {
|
None => transition!(PreparingTypeinfoEnum {
|
||||||
future: Box::new(state.client.prepare(
|
future: Box::new(
|
||||||
TYPEINFO_ENUM_NAME.to_string(),
|
state
|
||||||
TYPEINFO_ENUM_QUERY,
|
.client
|
||||||
&[]
|
.prepare(next_statement(), TYPEINFO_ENUM_QUERY, &[])
|
||||||
)),
|
),
|
||||||
oid: state.oid,
|
oid: state.oid,
|
||||||
client: state.client,
|
client: state.client,
|
||||||
}),
|
}),
|
||||||
@ -88,7 +86,7 @@ impl PollTypeinfoEnum for TypeinfoEnum {
|
|||||||
|
|
||||||
transition!(PreparingTypeinfoEnumFallback {
|
transition!(PreparingTypeinfoEnumFallback {
|
||||||
future: Box::new(state.client.prepare(
|
future: Box::new(state.client.prepare(
|
||||||
TYPEINFO_ENUM_NAME.to_string(),
|
next_statement(),
|
||||||
TYPEINFO_ENUM_FALLBACK_QUERY,
|
TYPEINFO_ENUM_FALLBACK_QUERY,
|
||||||
&[]
|
&[]
|
||||||
)),
|
)),
|
||||||
|
@ -163,22 +163,11 @@ fn pipelined_prepare() {
|
|||||||
let connection = connection.map_err(|e| panic!("{}", e));
|
let connection = connection.map_err(|e| panic!("{}", e));
|
||||||
runtime.handle().spawn(connection).unwrap();
|
runtime.handle().spawn(connection).unwrap();
|
||||||
|
|
||||||
let prepare1 = client.prepare("SELECT 1::BIGINT WHERE $1::BOOL");
|
let prepare1 = client.prepare("SELECT $1::HSTORE[]");
|
||||||
let prepare2 = client.prepare("SELECT ''::TEXT, 1::FLOAT4 WHERE $1::VARCHAR IS NOT NULL");
|
let prepare2 = client.prepare("SELECT $1::HSTORE[]");
|
||||||
let prepare = prepare1.join(prepare2);
|
let prepare = prepare1.join(prepare2);
|
||||||
let (statement1, statement2) = runtime.block_on(prepare).unwrap();
|
runtime.block_on(prepare).unwrap();
|
||||||
|
|
||||||
assert_eq!(statement1.params(), &[Type::BOOL]);
|
|
||||||
assert_eq!(statement1.columns().len(), 1);
|
|
||||||
assert_eq!(statement1.columns()[0].type_(), &Type::INT8);
|
|
||||||
|
|
||||||
assert_eq!(statement2.params(), &[Type::VARCHAR]);
|
|
||||||
assert_eq!(statement2.columns().len(), 2);
|
|
||||||
assert_eq!(statement2.columns()[0].type_(), &Type::TEXT);
|
|
||||||
assert_eq!(statement2.columns()[1].type_(), &Type::FLOAT4);
|
|
||||||
|
|
||||||
drop(statement1);
|
|
||||||
drop(statement2);
|
|
||||||
drop(client);
|
drop(client);
|
||||||
runtime.run().unwrap();
|
runtime.run().unwrap();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user