Don't bother waiting for a ReadyForQuery
We use it in the connection to track framing but don't need to wait around for it in the individual futures/streams
This commit is contained in:
parent
dcde61c16d
commit
349f3764a9
@ -15,13 +15,8 @@ pub enum Execute {
|
||||
request: PendingRequest,
|
||||
statement: Statement,
|
||||
},
|
||||
#[state_machine_future(transitions(ReadReadyForQuery))]
|
||||
ReadResponse { receiver: mpsc::Receiver<Message> },
|
||||
#[state_machine_future(transitions(Finished))]
|
||||
ReadReadyForQuery {
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
rows: u64,
|
||||
},
|
||||
ReadResponse { receiver: mpsc::Receiver<Message> },
|
||||
#[state_machine_future(ready)]
|
||||
Finished(u64),
|
||||
#[state_machine_future(error)]
|
||||
@ -56,36 +51,14 @@ impl PollExecute for Execute {
|
||||
.unwrap()
|
||||
.parse()
|
||||
.unwrap_or(0);
|
||||
let state = state.take();
|
||||
transition!(ReadReadyForQuery {
|
||||
receiver: state.receiver,
|
||||
rows,
|
||||
});
|
||||
}
|
||||
Some(Message::EmptyQueryResponse) => {
|
||||
let state = state.take();
|
||||
transition!(ReadReadyForQuery {
|
||||
receiver: state.receiver,
|
||||
rows: 0,
|
||||
});
|
||||
transition!(Finished(rows))
|
||||
}
|
||||
Some(Message::EmptyQueryResponse) => transition!(Finished(0)),
|
||||
Some(_) => return Err(Error::unexpected_message()),
|
||||
None => return Err(Error::closed()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_read_ready_for_query<'a>(
|
||||
state: &'a mut RentToOwn<'a, ReadReadyForQuery>,
|
||||
) -> Poll<AfterReadReadyForQuery, Error> {
|
||||
let message = try_ready_receive!(state.receiver.poll());
|
||||
|
||||
match message {
|
||||
Some(Message::ReadyForQuery(_)) => transition!(Finished(state.rows)),
|
||||
Some(_) => Err(Error::unexpected_message()),
|
||||
None => Err(Error::closed()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecuteFuture {
|
||||
|
@ -32,21 +32,13 @@ pub enum Prepare {
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
name: String,
|
||||
},
|
||||
#[state_machine_future(transitions(ReadReadyForQuery))]
|
||||
#[state_machine_future(transitions(GetParameterTypes, GetColumnTypes, Finished))]
|
||||
ReadRowDescription {
|
||||
client: Client,
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
name: String,
|
||||
parameters: Vec<Oid>,
|
||||
},
|
||||
#[state_machine_future(transitions(GetParameterTypes, GetColumnTypes, Finished))]
|
||||
ReadReadyForQuery {
|
||||
client: Client,
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
name: String,
|
||||
parameters: Vec<Oid>,
|
||||
columns: Vec<(String, Oid)>,
|
||||
},
|
||||
#[state_machine_future(transitions(GetColumnTypes, Finished))]
|
||||
GetParameterTypes {
|
||||
future: TypeinfoFuture,
|
||||
@ -135,27 +127,6 @@ impl PollPrepare for Prepare {
|
||||
None => return Err(Error::closed()),
|
||||
};
|
||||
|
||||
transition!(ReadReadyForQuery {
|
||||
receiver: state.receiver,
|
||||
name: state.name,
|
||||
parameters: state.parameters,
|
||||
columns,
|
||||
client: state.client,
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_read_ready_for_query<'a>(
|
||||
state: &'a mut RentToOwn<'a, ReadReadyForQuery>,
|
||||
) -> Poll<AfterReadReadyForQuery, Error> {
|
||||
let message = try_ready_receive!(state.receiver.poll());
|
||||
let state = state.take();
|
||||
|
||||
match message {
|
||||
Some(Message::ReadyForQuery(_)) => {}
|
||||
Some(_) => return Err(Error::unexpected_message()),
|
||||
None => return Err(Error::closed()),
|
||||
}
|
||||
|
||||
let mut parameters = state.parameters.into_iter();
|
||||
if let Some(oid) = parameters.next() {
|
||||
transition!(GetParameterTypes {
|
||||
@ -163,11 +134,11 @@ impl PollPrepare for Prepare {
|
||||
remaining_parameters: parameters,
|
||||
name: state.name,
|
||||
parameters: vec![],
|
||||
columns: state.columns,
|
||||
columns: columns,
|
||||
});
|
||||
}
|
||||
|
||||
let mut columns = state.columns.into_iter();
|
||||
let mut columns = columns.into_iter();
|
||||
if let Some((name, oid)) = columns.next() {
|
||||
transition!(GetColumnTypes {
|
||||
future: TypeinfoFuture::new(oid, state.client),
|
||||
|
@ -18,9 +18,6 @@ enum State {
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
statement: Statement,
|
||||
},
|
||||
ReadingReadyForQuery {
|
||||
receiver: mpsc::Receiver<Message>,
|
||||
},
|
||||
Done,
|
||||
}
|
||||
|
||||
@ -77,28 +74,12 @@ impl Stream for QueryStream {
|
||||
break Ok(Async::Ready(Some(row)));
|
||||
}
|
||||
Some(Message::EmptyQueryResponse) | Some(Message::CommandComplete(_)) => {
|
||||
self.0 = State::ReadingReadyForQuery { receiver };
|
||||
break Ok(Async::Ready(None));
|
||||
}
|
||||
Some(_) => break Err(Error::unexpected_message()),
|
||||
None => break Err(Error::closed()),
|
||||
}
|
||||
}
|
||||
State::ReadingReadyForQuery { mut receiver } => {
|
||||
let message = match receiver.poll() {
|
||||
Ok(Async::Ready(message)) => message,
|
||||
Ok(Async::NotReady) => {
|
||||
self.0 = State::ReadingReadyForQuery { receiver };
|
||||
break Ok(Async::NotReady);
|
||||
}
|
||||
Err(()) => unreachable!("mpsc::Receiver doesn't return errors"),
|
||||
};
|
||||
|
||||
match message {
|
||||
Some(Message::ReadyForQuery(_)) => break Ok(Async::Ready(None)),
|
||||
Some(_) => break Err(Error::unexpected_message()),
|
||||
None => break Err(Error::closed()),
|
||||
}
|
||||
}
|
||||
State::Done => break Ok(Async::Ready(None)),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user