rust-postgres/postgres/src/client.rs

187 lines
4.7 KiB
Rust
Raw Normal View History

2018-12-23 21:08:02 +00:00
use bytes::{Buf, Bytes};
use futures::stream;
2018-12-23 23:02:42 +00:00
use futures::{Async, Future, Poll, Stream};
2018-12-23 21:08:02 +00:00
use std::io::{self, BufRead, Cursor, Read};
use std::marker::PhantomData;
2018-12-21 21:34:09 +00:00
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Error, Row};
#[cfg(feature = "runtime")]
use tokio_postgres::{MakeTlsMode, Socket, TlsMode};
#[cfg(feature = "runtime")]
use crate::Builder;
use crate::{Query, Statement, Transaction};
2018-12-21 21:34:09 +00:00
pub struct Client(tokio_postgres::Client);
impl Client {
#[cfg(feature = "runtime")]
pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
where
T: MakeTlsMode<Socket> + 'static + Send,
T::TlsMode: Send,
T::Stream: Send,
T::Future: Send,
<T::TlsMode as TlsMode<Socket>>::Future: Send,
{
params.parse::<Builder>()?.connect(tls_mode)
}
#[cfg(feature = "runtime")]
pub fn builder() -> Builder {
Builder::new()
}
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
self.0.prepare(query).wait().map(Statement)
}
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
self.0.prepare_typed(query, types).wait().map(Statement)
}
pub fn execute<T>(&mut self, query: &T, params: &[&dyn ToSql]) -> Result<u64, Error>
where
2018-12-22 05:01:49 +00:00
T: ?Sized + Query,
{
let statement = query.__statement(self)?;
2018-12-21 21:34:09 +00:00
self.0.execute(&statement.0, params).wait()
}
pub fn query<T>(&mut self, query: &T, params: &[&dyn ToSql]) -> Result<Vec<Row>, Error>
where
2018-12-22 05:01:49 +00:00
T: ?Sized + Query,
{
let statement = query.__statement(self)?;
2018-12-21 21:34:09 +00:00
self.0.query(&statement.0, params).collect().wait()
}
2018-12-23 05:42:03 +00:00
pub fn copy_in<T, R>(
&mut self,
query: &T,
params: &[&dyn ToSql],
reader: R,
) -> Result<u64, Error>
where
T: ?Sized + Query,
R: Read,
{
let statement = query.__statement(self)?;
2018-12-23 23:02:42 +00:00
self.0
.copy_in(&statement.0, params, CopyInStream(reader))
.wait()
2018-12-23 05:42:03 +00:00
}
2018-12-23 21:08:02 +00:00
pub fn copy_out<T>(
&mut self,
query: &T,
params: &[&dyn ToSql],
) -> Result<CopyOutReader<'_>, Error>
where
T: ?Sized + Query,
{
let statement = query.__statement(self)?;
let mut stream = self.0.copy_out(&statement.0, params).wait();
let cur = match stream.next() {
Some(Ok(cur)) => cur,
Some(Err(e)) => return Err(e),
None => Bytes::new(),
};
Ok(CopyOutReader {
stream,
cur: Cursor::new(cur),
_p: PhantomData,
})
}
2018-12-21 21:34:09 +00:00
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.0.batch_execute(query).wait()
}
2018-12-21 21:46:50 +00:00
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
self.batch_execute("BEGIN")?;
Ok(Transaction::new(self))
}
2018-12-22 05:08:26 +00:00
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
2018-12-23 23:58:39 +00:00
pub fn get_ref(&self) -> &tokio_postgres::Client {
&self.0
}
pub fn get_mut(&mut self) -> &mut tokio_postgres::Client {
&mut self.0
}
pub fn into_inner(self) -> tokio_postgres::Client {
self.0
}
2018-12-21 21:34:09 +00:00
}
impl From<tokio_postgres::Client> for Client {
fn from(c: tokio_postgres::Client) -> Client {
Client(c)
}
}
2018-12-23 05:42:03 +00:00
2018-12-23 23:02:42 +00:00
struct CopyInStream<R>(R);
2018-12-23 05:42:03 +00:00
2018-12-23 23:02:42 +00:00
impl<R> Stream for CopyInStream<R>
where
R: Read,
{
2018-12-23 05:42:03 +00:00
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Vec<u8>>, io::Error> {
2018-12-23 23:02:42 +00:00
let mut buf = vec![];
match self.0.by_ref().take(4096).read_to_end(&mut buf)? {
0 => Ok(Async::Ready(None)),
_ => Ok(Async::Ready(Some(buf))),
2018-12-23 05:42:03 +00:00
}
}
}
2018-12-23 21:08:02 +00:00
pub struct CopyOutReader<'a> {
stream: stream::Wait<tokio_postgres::CopyOut>,
cur: Cursor<Bytes>,
_p: PhantomData<&'a mut ()>,
}
// no-op impl to extend borrow until drop
impl<'a> Drop for CopyOutReader<'a> {
fn drop(&mut self) {}
}
2018-12-23 21:08:02 +00:00
impl<'a> Read for CopyOutReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let b = self.fill_buf()?;
let len = usize::min(buf.len(), b.len());
buf[..len].copy_from_slice(&b[..len]);
self.consume(len);
Ok(len)
}
}
impl<'a> BufRead for CopyOutReader<'a> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.cur.remaining() == 0 {
match self.stream.next() {
Some(Ok(cur)) => self.cur = Cursor::new(cur),
Some(Err(e)) => return Err(io::Error::new(io::ErrorKind::Other, e)),
None => {}
};
}
Ok(Buf::bytes(&self.cur))
}
fn consume(&mut self, amt: usize) {
self.cur.advance(amt);
}
}