Shift functions around

This commit is contained in:
Steven Fackler 2016-12-21 21:54:11 -05:00
parent 522ea10a98
commit ab672e42b4

View File

@ -332,6 +332,44 @@ impl Connection {
.boxed()
}
fn close_gc(self) -> BoxFuture<Connection, Error> {
let mut messages = vec![];
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
let mut buf = vec![];
frontend::close(type_, &name, &mut buf).unwrap(); // this can only fail on bad names
messages.push(buf);
}
if messages.is_empty() {
return Ok(self).into_future().boxed();
}
let mut buf = vec![];
frontend::sync(&mut buf);
messages.push(buf);
self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
.map_err(Error::Io)
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed()
}
fn finish_close_gc(self) -> BoxFuture<Connection, Error> {
self.0.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => {
Either::A(Ok(Connection(s)).into_future())
}
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
})
.boxed()
}
fn ready_err<T>(self, body: ErrorResponseBody<Vec<u8>>) -> BoxFuture<T, Error>
where T: 'static + Send
{
@ -563,44 +601,6 @@ impl Connection {
.boxed()
}
fn close_gc(self) -> BoxFuture<Connection, Error> {
let mut messages = vec![];
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
let mut buf = vec![];
frontend::close(type_, &name, &mut buf).unwrap(); // this can only fail on bad names
messages.push(buf);
}
if messages.is_empty() {
return Ok(self).into_future().boxed();
}
let mut buf = vec![];
frontend::sync(&mut buf);
messages.push(buf);
self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>)))
.map_err(Error::Io)
.and_then(|s| Connection(s.0).finish_close_gc())
.boxed()
}
fn finish_close_gc(self) -> BoxFuture<Connection, Error> {
self.0.read()
.map_err(Error::Io)
.and_then(|(m, s)| {
match m {
backend::Message::ReadyForQuery(_) => {
Either::A(Ok(Connection(s)).into_future())
}
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
_ => Either::A(Err(bad_message()).into_future()),
}
})
.boxed()
}
pub fn close(self) -> BoxFuture<(), Error> {
let mut terminate = vec![];
frontend::terminate(&mut terminate);