2019-12-16 01:01:53 +00:00
|
|
|
use crate::lazy_pin::LazyPin;
|
2019-12-15 23:58:50 +00:00
|
|
|
use crate::Rt;
|
2018-12-29 05:01:10 +00:00
|
|
|
use bytes::{Buf, Bytes};
|
2019-12-04 02:25:29 +00:00
|
|
|
use futures::StreamExt;
|
2019-12-16 01:01:53 +00:00
|
|
|
use std::io::{self, BufRead, Read};
|
|
|
|
use tokio_postgres::CopyOutStream;
|
2018-12-29 05:01:10 +00:00
|
|
|
|
2019-03-31 03:58:01 +00:00
|
|
|
/// The reader returned by the `copy_out` method.
|
2019-11-30 16:04:59 +00:00
|
|
|
pub struct CopyOutReader<'a> {
|
2019-12-16 01:01:53 +00:00
|
|
|
pub(crate) runtime: Rt<'a>,
|
|
|
|
pub(crate) stream: LazyPin<CopyOutStream>,
|
|
|
|
cur: Bytes,
|
2018-12-29 05:01:10 +00:00
|
|
|
}
|
|
|
|
|
2019-11-30 16:04:59 +00:00
|
|
|
impl<'a> CopyOutReader<'a> {
|
2019-12-16 01:01:53 +00:00
|
|
|
pub(crate) fn new(runtime: Rt<'a>, stream: CopyOutStream) -> CopyOutReader<'a> {
|
|
|
|
CopyOutReader {
|
2019-12-04 02:25:29 +00:00
|
|
|
runtime,
|
2019-12-16 01:01:53 +00:00
|
|
|
stream: LazyPin::new(stream),
|
|
|
|
cur: Bytes::new(),
|
|
|
|
}
|
2018-12-29 05:01:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-30 16:04:59 +00:00
|
|
|
impl Read for CopyOutReader<'_> {
|
2018-12-29 05:01:10 +00:00
|
|
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
|
|
let b = self.fill_buf()?;
|
|
|
|
let len = usize::min(buf.len(), b.len());
|
|
|
|
buf[..len].copy_from_slice(&b[..len]);
|
|
|
|
self.consume(len);
|
|
|
|
Ok(len)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-30 16:04:59 +00:00
|
|
|
impl BufRead for CopyOutReader<'_> {
|
2018-12-29 05:01:10 +00:00
|
|
|
fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
2019-12-16 01:01:53 +00:00
|
|
|
if !self.cur.has_remaining() {
|
|
|
|
match self.runtime.block_on(self.stream.pinned().next()) {
|
|
|
|
Some(Ok(cur)) => self.cur = cur,
|
2018-12-29 05:01:10 +00:00
|
|
|
Some(Err(e)) => return Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
|
|
None => {}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2019-12-16 01:01:53 +00:00
|
|
|
Ok(self.cur.bytes())
|
2018-12-29 05:01:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn consume(&mut self, amt: usize) {
|
|
|
|
self.cur.advance(amt);
|
|
|
|
}
|
|
|
|
}
|