53 lines
1.5 KiB
Rust
53 lines
1.5 KiB
Rust
|
use crate::client::{InnerClient, Responses};
|
||
|
use crate::Error;
|
||
|
use bytes::Bytes;
|
||
|
use futures::{Stream, TryFutureExt, ready};
|
||
|
use std::sync::Arc;
|
||
|
use crate::codec::FrontendMessage;
|
||
|
use postgres_protocol::message::backend::Message;
|
||
|
use std::pin::Pin;
|
||
|
use std::task::{Context, Poll};
|
||
|
use crate::connection::RequestMessages;
|
||
|
|
||
|
pub fn copy_out(
|
||
|
client: Arc<InnerClient>,
|
||
|
buf: Result<Vec<u8>, Error>,
|
||
|
) -> impl Stream<Item = Result<Bytes, Error>> {
|
||
|
start(client, buf)
|
||
|
.map_ok(|responses| CopyOut { responses })
|
||
|
.try_flatten_stream()
|
||
|
}
|
||
|
|
||
|
async fn start(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> Result<Responses, Error> {
|
||
|
let buf = buf?;
|
||
|
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||
|
|
||
|
match responses.next().await? {
|
||
|
Message::BindComplete => {}
|
||
|
_ => return Err(Error::unexpected_message()),
|
||
|
}
|
||
|
|
||
|
match responses.next().await? {
|
||
|
Message::CopyOutResponse(_) => {}
|
||
|
_ => return Err(Error::unexpected_message()),
|
||
|
}
|
||
|
|
||
|
Ok(responses)
|
||
|
}
|
||
|
|
||
|
struct CopyOut {
|
||
|
responses: Responses,
|
||
|
}
|
||
|
|
||
|
impl Stream for CopyOut {
|
||
|
type Item = Result<Bytes, Error>;
|
||
|
|
||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||
|
match ready!(self.responses.poll_next(cx)?) {
|
||
|
Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
|
||
|
Message::CopyDone => Poll::Ready(None),
|
||
|
_ => Poll::Ready(Some(Err(Error::unexpected_message()))),
|
||
|
}
|
||
|
}
|
||
|
}
|