Proper error/row description handling for batch_execute

This commit is contained in:
Steven Fackler 2016-12-20 20:11:07 -08:00
parent 4b6eee560d
commit 7243455c94
2 changed files with 38 additions and 26 deletions

View File

@ -11,7 +11,7 @@ use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend};
use futures::future::Either; use futures::future::Either;
use postgres_protocol::authentication; use postgres_protocol::authentication;
use postgres_protocol::message::{backend, frontend}; use postgres_protocol::message::{backend, frontend};
use postgres_protocol::message::backend::ErrorFields; use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields};
use postgres_shared::RowData; use postgres_shared::RowData;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
@ -263,25 +263,22 @@ impl Connection {
.and_then(|(m, s)| { .and_then(|(m, s)| {
match m { match m {
backend::Message::ReadyForQuery(_) => { backend::Message::ReadyForQuery(_) => {
Either::A(Ok((rows, Connection(s))).into_future()) Ok((rows, Connection(s))).into_future().boxed()
} }
backend::Message::DataRow(body) => { backend::Message::DataRow(body) => {
match body.values().collect() { match body.values().collect() {
Ok(row) => { Ok(row) => {
rows.push(row); rows.push(row);
Either::B(Connection(s).simple_read_rows(rows)) Connection(s).simple_read_rows(rows)
} }
Err(e) => Either::A(Err(Error::Io(e)).into_future()), Err(e) => Err(Error::Io(e)).into_future().boxed(),
} }
} }
backend::Message::EmptyQueryResponse | backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) => { backend::Message::CommandComplete(_) |
Either::B(Connection(s).simple_read_rows(rows)) backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
} backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
backend::Message::ErrorResponse(body) => { _ => Err(bad_message()).into_future().boxed(),
Either::A(Err(err(&mut body.fields(), Connection(s))).into_future())
}
_ => Either::A(Err(bad_message()).into_future()),
} }
}) })
.boxed() .boxed()
@ -293,22 +290,18 @@ impl Connection {
.and_then(|(m, s)| { .and_then(|(m, s)| {
match m { match m {
backend::Message::EmptyQueryResponse | backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) => { backend::Message::CommandComplete(_) => Connection(s).ready(rows).boxed(),
Either::B(Connection(s).ready(rows))
},
backend::Message::DataRow(body) => { backend::Message::DataRow(body) => {
match body.values().collect() { match body.values().collect() {
Ok(row) => { Ok(row) => {
rows.push(row); rows.push(row);
Either::B(Connection(s).read_rows(rows)) Connection(s).read_rows(rows)
} }
Err(e) => Either::A(Err(Error::Io(e)).into_future()), Err(e) => Err(Error::Io(e)).into_future().boxed(),
} }
} }
backend::Message::ErrorResponse(body) => { backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
Either::A(Err(err(&mut body.fields(), Connection(s))).into_future()) _ => Err(bad_message()).into_future().boxed(),
}
_ => Either::A(Err(bad_message()).into_future()),
} }
}) })
.boxed() .boxed()
@ -328,6 +321,19 @@ impl Connection {
.boxed() .boxed()
} }
fn ready_err<T>(self, body: ErrorResponseBody<Vec<u8>>) -> BoxFuture<T, Error>
where T: 'static + Send
{
self.ready(DbError::new(&mut body.fields()))
.and_then(|(e, s)| {
match e {
Ok(e) => Err(Error::Db(Box::new(e), s)),
Err(e) => Err(Error::Io(e)),
}
})
.boxed()
}
pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, Error> { pub fn batch_execute(self, query: &str) -> BoxFuture<Connection, Error> {
self.simple_query(query).map(|r| r.1).boxed() self.simple_query(query).map(|r| r.1).boxed()
} }

View File

@ -91,10 +91,16 @@ fn batch_execute_err() {
let done = Connection::connect("postgres://postgres@localhost", &l.handle()) let done = Connection::connect("postgres://postgres@localhost", &l.handle())
.then(|r| r.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); \ .then(|r| r.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); \
INSERT INTO foo DEFAULT VALUES;")) INSERT INTO foo DEFAULT VALUES;"))
.and_then(|c| c.batch_execute("SELECT * FROM bogo")); .and_then(|c| c.batch_execute("SELECT * FROM bogo"))
match l.run(done) { .then(|r| {
Err(Error::Db(ref e, _)) if e.code == SqlState::UndefinedTable => {} match r {
Err(e) => panic!("unexpected error: {}", e), Err(Error::Db(e, s)) => {
Ok(_) => panic!("unexpected success"), assert!(e.code == SqlState::UndefinedTable);
} s.batch_execute("SELECT * FROM foo")
}
Err(e) => panic!("unexpected error: {}", e),
Ok(_) => panic!("unexpected success"),
}
});
l.run(done).unwrap();
} }