Add raw_prepare

This commit is contained in:
Steven Fackler 2016-12-21 11:16:43 -08:00
parent d08dc136bc
commit 3356713331

View File

@ -19,11 +19,12 @@ use std::io;
use tokio_core::reactor::Handle;
#[doc(inline)]
pub use postgres_shared::params;
pub use postgres_shared::{params, types};
use error::{ConnectError, Error, DbError};
use params::{ConnectParams, IntoConnectParams};
use stream::PostgresStream;
use types::{Oid, Type};
pub mod error;
mod stream;
@ -44,23 +45,19 @@ struct InnerConnection {
}
impl InnerConnection {
fn read(self) -> BoxFuture<(backend::Message<Vec<u8>>, InnerConnection), (io::Error, InnerConnection)> {
fn read(self) -> BoxFuture<(backend::Message<Vec<u8>>, InnerConnection), io::Error> {
self.into_future()
.then(|r| {
let (m, mut s) = match r {
Ok((m, s)) => (m, s),
Err((e, s)) => return Either::A(Err((e, s)).into_future()),
};
.map_err(|e| e.0)
.and_then(|(m, mut s)| {
match m {
Some(backend::Message::ParameterStatus(body)) => {
let name = match body.name() {
Ok(name) => name.to_owned(),
Err(e) => return Either::A(Err((e, s)).into_future()),
Err(e) => return Either::A(Err(e).into_future()),
};
let value = match body.value() {
Ok(value) => value.to_owned(),
Err(e) => return Either::A(Err((e, s)).into_future()),
Err(e) => return Either::A(Err(e).into_future()),
};
s.parameters.insert(name, value);
Either::B(s.read())
@ -72,7 +69,7 @@ impl InnerConnection {
Some(m) => Either::A(Ok((m, s)).into_future()),
None => {
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF");
Either::A(Err((err, s)).into_future())
Either::A(Err(err).into_future())
}
}
})
@ -162,7 +159,7 @@ impl Connection {
fn handle_auth(self, params: ConnectParams) -> BoxFuture<Connection, ConnectError> {
self.0.read()
.map_err(|e| e.0.into())
.map_err(ConnectError::Io)
.and_then(move |(m, s)| {
let response = match m {
backend::Message::AuthenticationOk => Ok(None),
@ -215,7 +212,7 @@ impl Connection {
fn handle_auth_response(self, message: Vec<u8>) -> BoxFuture<Connection, ConnectError> {
self.0.send(message)
.and_then(|s| s.flush())
.and_then(|s| s.read().map_err(|e| e.0))
.and_then(|s| s.read())
.map_err(ConnectError::Io)
.and_then(|(m, s)| {
match m {
@ -229,7 +226,7 @@ impl Connection {
fn finish_startup(self) -> BoxFuture<Connection, ConnectError> {
self.0.read()
.map_err(|e| ConnectError::Io(e.0))
.map_err(ConnectError::Io)
.and_then(|(m, mut s)| {
match m {
backend::Message::BackendKeyData(body) => {
@ -262,7 +259,7 @@ impl Connection {
// This has its own read_rows since it will need to handle multiple query completions
fn simple_read_rows(self, mut rows: Vec<RowData>) -> BoxFuture<(Vec<RowData>, Connection), Error> {
self.0.read()
.map_err(|e| Error::Io(e.0))
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => {
@ -287,9 +284,10 @@ impl Connection {
.boxed()
}
#[allow(dead_code)]
fn read_rows(self, mut rows: Vec<RowData>) -> BoxFuture<(Vec<RowData>, Connection), Error> {
self.0.read()
.map_err(|e| Error::Io(e.0))
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::EmptyQueryResponse |
@ -314,7 +312,7 @@ impl Connection {
where T: 'static + Send
{
self.0.read()
.map_err(|e| Error::Io(e.0))
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))),
@ -339,11 +337,117 @@ impl Connection {
self.simple_query(query).map(|r| r.1).boxed()
}
fn raw_prepare(self,
name: &str,
query: &str)
-> BoxFuture<(Vec<Type>, Vec<Column>, Connection), Error> {
let mut parse = vec![];
let mut describe = vec![];
let mut sync = vec![];
frontend::parse(name, query, None, &mut parse)
.and_then(|()| frontend::describe(b'S', name, &mut describe))
.and_then(|()| Ok(frontend::sync(&mut sync)))
.into_future()
.and_then(move |()| self.0.send(parse))
.and_then(|s| s.send(describe))
.and_then(|s| s.send(sync))
.and_then(|s| s.flush())
.and_then(|s| s.read())
.map_err(Error::Io)
.boxed() // work around nonlinear trans blowup
.and_then(|(m, s)| {
match m {
backend::Message::ParseComplete => Either::A(Ok(s).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
})
.and_then(|s| s.read().map_err(Error::Io))
.and_then(|(m, s)| {
match m {
backend::Message::ParameterDescription(body) => {
body.parameters().collect::<Vec<_>>()
.map(|p| (p, s))
.map_err(Error::Io)
}
_ => Err(bad_message()),
}
})
.and_then(|(p, s)| s.read().map(|(m, s)| (p, m, s)).map_err(Error::Io))
.boxed() // work around nonlinear trans blowup
.and_then(|(p, m, s)| {
match m {
backend::Message::RowDescription(body) => {
body.fields()
.map(|f| (f.name().to_owned(), f.type_oid()))
.collect::<Vec<_>>()
.map(|d| (p, d, s))
.map_err(Error::Io)
}
backend::Message::NoData => Ok((p, vec![], s)),
_ => Err(bad_message()),
}
})
.and_then(|(p, r, s)| Connection(s).ready((p, r)))
.and_then(|((p, r), s)| {
s.get_types(p.into_iter(), vec![], |&p| p, |_, t| t)
.map(|(p, s)| (p, r, s))
})
.and_then(|(p, r, s)| {
s.get_types(r.into_iter(),
vec![],
|f| f.1,
|f, t| Column { name: f.0, type_: t })
.map(|(r, s)| (p, r, s))
})
.boxed()
}
fn get_types<T, U, I, F, G>(self,
mut raw: I,
mut out: Vec<U>,
mut get_oid: F,
mut build: G)
-> BoxFuture<(Vec<U>, Connection), Error>
where T: 'static + Send,
U: 'static + Send,
I: 'static + Send + Iterator<Item = T>,
F: 'static + Send + FnMut(&T) -> Oid,
G: 'static + Send + FnMut(T, Type) -> U
{
match raw.next() {
Some(v) => {
let oid = get_oid(&v);
self.get_type(oid)
.and_then(move |(ty, s)| {
out.push(build(v, ty));
s.get_types(raw, out, get_oid, build)
})
.boxed()
}
None => Ok((out, self)).into_future().boxed(),
}
}
fn get_type(self, oid: Oid) -> BoxFuture<(Type, Connection), Error> {
if let Some(type_) = Type::from_oid(oid) {
return Ok((type_, self)).into_future().boxed();
};
unimplemented!()
}
pub fn cancel_data(&self) -> CancelData {
self.0.cancel_data
}
}
struct Column {
name: String,
type_: Type,
}
fn connect_err(fields: &mut ErrorFields) -> ConnectError {
match DbError::new(fields) {
Ok(err) => ConnectError::Db(Box::new(err)),