2019-07-31 04:25:30 +00:00
|
|
|
use crate::client::InnerClient;
|
|
|
|
use crate::codec::FrontendMessage;
|
|
|
|
use crate::connection::RequestMessages;
|
2019-09-28 18:42:00 +00:00
|
|
|
use crate::types::ToSql;
|
|
|
|
use crate::{query, Error, Statement};
|
2019-11-27 00:32:36 +00:00
|
|
|
use bytes::buf::BufExt;
|
|
|
|
use bytes::{Buf, BufMut, BytesMut};
|
2019-07-31 04:25:30 +00:00
|
|
|
use futures::channel::mpsc;
|
2019-10-19 02:33:33 +00:00
|
|
|
use futures::{pin_mut, ready, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
|
2019-07-31 04:25:30 +00:00
|
|
|
use postgres_protocol::message::backend::Message;
|
|
|
|
use postgres_protocol::message::frontend;
|
2019-07-31 04:29:18 +00:00
|
|
|
use postgres_protocol::message::frontend::CopyData;
|
2019-07-31 04:25:30 +00:00
|
|
|
use std::error;
|
|
|
|
use std::pin::Pin;
|
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
|
|
|
enum CopyInMessage {
|
|
|
|
Message(FrontendMessage),
|
|
|
|
Done,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct CopyInReceiver {
|
|
|
|
receiver: mpsc::Receiver<CopyInMessage>,
|
|
|
|
done: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl CopyInReceiver {
|
|
|
|
fn new(receiver: mpsc::Receiver<CopyInMessage>) -> CopyInReceiver {
|
|
|
|
CopyInReceiver {
|
|
|
|
receiver,
|
|
|
|
done: false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Stream for CopyInReceiver {
|
|
|
|
type Item = FrontendMessage;
|
|
|
|
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
|
|
|
|
if self.done {
|
|
|
|
return Poll::Ready(None);
|
|
|
|
}
|
|
|
|
|
|
|
|
match ready!(self.receiver.poll_next_unpin(cx)) {
|
|
|
|
Some(CopyInMessage::Message(message)) => Poll::Ready(Some(message)),
|
|
|
|
Some(CopyInMessage::Done) => {
|
|
|
|
self.done = true;
|
2019-10-12 23:30:27 +00:00
|
|
|
let mut buf = BytesMut::new();
|
2019-07-31 04:25:30 +00:00
|
|
|
frontend::copy_done(&mut buf);
|
|
|
|
frontend::sync(&mut buf);
|
2019-10-12 23:30:27 +00:00
|
|
|
Poll::Ready(Some(FrontendMessage::Raw(buf.freeze())))
|
2019-07-31 04:25:30 +00:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
self.done = true;
|
2019-10-12 23:30:27 +00:00
|
|
|
let mut buf = BytesMut::new();
|
2019-07-31 04:25:30 +00:00
|
|
|
frontend::copy_fail("", &mut buf).unwrap();
|
|
|
|
frontend::sync(&mut buf);
|
2019-10-12 23:30:27 +00:00
|
|
|
Poll::Ready(Some(FrontendMessage::Raw(buf.freeze())))
|
2019-07-31 04:25:30 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-28 18:42:00 +00:00
|
|
|
pub async fn copy_in<'a, I, S>(
|
|
|
|
client: &InnerClient,
|
|
|
|
statement: Statement,
|
|
|
|
params: I,
|
2019-07-31 04:25:30 +00:00
|
|
|
stream: S,
|
|
|
|
) -> Result<u64, Error>
|
|
|
|
where
|
2019-09-28 18:42:00 +00:00
|
|
|
I: IntoIterator<Item = &'a dyn ToSql>,
|
|
|
|
I::IntoIter: ExactSizeIterator,
|
2019-07-31 04:25:30 +00:00
|
|
|
S: TryStream,
|
2019-11-27 00:32:36 +00:00
|
|
|
S::Ok: Buf + 'static + Send,
|
2019-07-31 04:25:30 +00:00
|
|
|
S::Error: Into<Box<dyn error::Error + Sync + Send>>,
|
|
|
|
{
|
2019-10-12 23:30:27 +00:00
|
|
|
let buf = query::encode(client, &statement, params)?;
|
2019-07-31 04:25:30 +00:00
|
|
|
|
|
|
|
let (mut sender, receiver) = mpsc::channel(1);
|
|
|
|
let receiver = CopyInReceiver::new(receiver);
|
|
|
|
let mut responses = client.send(RequestMessages::CopyIn(receiver))?;
|
|
|
|
|
|
|
|
sender
|
|
|
|
.send(CopyInMessage::Message(FrontendMessage::Raw(buf)))
|
|
|
|
.await
|
|
|
|
.map_err(|_| Error::closed())?;
|
|
|
|
|
|
|
|
match responses.next().await? {
|
|
|
|
Message::BindComplete => {}
|
|
|
|
_ => return Err(Error::unexpected_message()),
|
|
|
|
}
|
|
|
|
|
|
|
|
match responses.next().await? {
|
|
|
|
Message::CopyInResponse(_) => {}
|
|
|
|
_ => return Err(Error::unexpected_message()),
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
let stream = stream.into_stream();
|
|
|
|
pin_mut!(stream);
|
|
|
|
|
|
|
|
while let Some(buf) = stream.try_next().await.map_err(Error::copy_in_stream)? {
|
|
|
|
let data: Box<dyn Buf + Send> = if buf.remaining() > 4096 {
|
|
|
|
if bytes.is_empty() {
|
|
|
|
Box::new(buf)
|
|
|
|
} else {
|
2019-11-27 00:32:36 +00:00
|
|
|
Box::new(bytes.split().freeze().chain(buf))
|
2019-07-31 04:25:30 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
bytes.reserve(buf.remaining());
|
|
|
|
bytes.put(buf);
|
|
|
|
if bytes.len() > 4096 {
|
2019-11-27 00:32:36 +00:00
|
|
|
Box::new(bytes.split().freeze())
|
2019-07-31 04:25:30 +00:00
|
|
|
} else {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let data = CopyData::new(data).map_err(Error::encode)?;
|
|
|
|
sender
|
|
|
|
.send(CopyInMessage::Message(FrontendMessage::CopyData(data)))
|
|
|
|
.await
|
|
|
|
.map_err(|_| Error::closed())?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if !bytes.is_empty() {
|
2019-11-27 00:32:36 +00:00
|
|
|
let data: Box<dyn Buf + Send> = Box::new(bytes.freeze());
|
2019-07-31 04:25:30 +00:00
|
|
|
let data = CopyData::new(data).map_err(Error::encode)?;
|
|
|
|
sender
|
|
|
|
.send(CopyInMessage::Message(FrontendMessage::CopyData(data)))
|
|
|
|
.await
|
|
|
|
.map_err(|_| Error::closed())?;
|
|
|
|
}
|
|
|
|
|
|
|
|
sender
|
|
|
|
.send(CopyInMessage::Done)
|
|
|
|
.await
|
|
|
|
.map_err(|_| Error::closed())?;
|
|
|
|
|
|
|
|
match responses.next().await? {
|
|
|
|
Message::CommandComplete(body) => {
|
|
|
|
let rows = body
|
|
|
|
.tag()
|
|
|
|
.map_err(Error::parse)?
|
|
|
|
.rsplit(' ')
|
|
|
|
.next()
|
|
|
|
.unwrap()
|
|
|
|
.parse()
|
|
|
|
.unwrap_or(0);
|
|
|
|
Ok(rows)
|
|
|
|
}
|
|
|
|
_ => Err(Error::unexpected_message()),
|
|
|
|
}
|
|
|
|
}
|