rust-postgres/tokio-postgres/src/lib.rs

395 lines
9.2 KiB
Rust
Raw Normal View History

2018-12-09 01:40:37 +00:00
#![warn(rust_2018_idioms)]
2018-12-03 05:26:10 +00:00
use bytes::{Bytes, IntoBuf};
2018-12-09 01:40:37 +00:00
use futures::{try_ready, Async, Future, Poll, Stream};
2018-08-11 21:32:17 +00:00
use std::error::Error as StdError;
2018-06-21 00:06:11 +00:00
use std::fmt;
2018-06-19 02:34:25 +00:00
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite};
2016-12-20 23:42:28 +00:00
2018-12-09 01:40:37 +00:00
pub use crate::builder::*;
pub use crate::error::*;
use crate::proto::CancelFuture;
use crate::rows::RowIndex;
pub use crate::stmt::Column;
pub use crate::tls::*;
use crate::types::{FromSql, ToSql, Type};
2016-12-20 23:42:28 +00:00
mod builder;
pub mod error;
2018-06-17 04:29:27 +00:00
mod proto;
2018-12-09 01:39:20 +00:00
pub mod rows;
mod stmt;
mod tls;
2018-12-09 01:39:20 +00:00
pub mod types;
2017-07-20 04:22:27 +00:00
fn next_statement() -> String {
2018-08-16 02:53:20 +00:00
static ID: AtomicUsize = AtomicUsize::new(0);
format!("s{}", ID.fetch_add(1, Ordering::SeqCst))
}
fn next_portal() -> String {
static ID: AtomicUsize = AtomicUsize::new(0);
format!("p{}", ID.fetch_add(1, Ordering::SeqCst))
}
pub fn cancel_query<S, T>(stream: S, tls_mode: T, cancel_data: CancelData) -> CancelQuery<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsMode<S>,
{
CancelQuery(CancelFuture::new(stream, tls_mode, cancel_data))
2018-06-19 02:34:25 +00:00
}
2018-06-18 12:18:04 +00:00
pub struct Client(proto::Client);
2016-12-20 23:42:28 +00:00
2018-06-17 04:29:27 +00:00
impl Client {
2018-06-19 02:34:25 +00:00
pub fn prepare(&mut self, query: &str) -> Prepare {
self.prepare_typed(query, &[])
}
pub fn prepare_typed(&mut self, query: &str, param_types: &[Type]) -> Prepare {
Prepare(self.0.prepare(next_statement(), query, param_types))
2018-06-19 02:34:25 +00:00
}
2018-06-20 02:10:07 +00:00
2018-12-09 01:40:37 +00:00
pub fn execute(&mut self, statement: &Statement, params: &[&dyn ToSql]) -> Execute {
2018-06-20 02:10:07 +00:00
Execute(self.0.execute(&statement.0, params))
}
2018-06-21 00:06:11 +00:00
2018-12-09 01:40:37 +00:00
pub fn query(&mut self, statement: &Statement, params: &[&dyn ToSql]) -> Query {
2018-06-21 00:06:11 +00:00
Query(self.0.query(&statement.0, params))
}
2018-07-08 05:42:04 +00:00
2018-12-09 01:40:37 +00:00
pub fn bind(&mut self, statement: &Statement, params: &[&dyn ToSql]) -> Bind {
2018-08-16 02:53:20 +00:00
Bind(self.0.bind(&statement.0, next_portal(), params))
}
2018-08-16 04:00:15 +00:00
pub fn query_portal(&mut self, portal: &Portal, max_rows: i32) -> QueryPortal {
QueryPortal(self.0.query_portal(&portal.0, max_rows))
}
2018-12-09 01:40:37 +00:00
pub fn copy_in<S>(
&mut self,
statement: &Statement,
params: &[&dyn ToSql],
stream: S,
) -> CopyIn<S>
2018-08-11 21:32:17 +00:00
where
2018-08-13 03:23:21 +00:00
S: Stream,
2018-12-03 05:26:10 +00:00
S::Item: IntoBuf,
<S::Item as IntoBuf>::Buf: Send,
// FIXME error type?
2018-12-09 01:40:37 +00:00
S::Error: Into<Box<dyn StdError + Sync + Send>>,
2018-08-11 21:32:17 +00:00
{
CopyIn(self.0.copy_in(&statement.0, params, stream))
}
2018-12-09 01:40:37 +00:00
pub fn copy_out(&mut self, statement: &Statement, params: &[&dyn ToSql]) -> CopyOut {
2018-07-16 02:40:15 +00:00
CopyOut(self.0.copy_out(&statement.0, params))
}
2018-12-02 04:01:46 +00:00
pub fn transaction(&mut self) -> TransactionBuilder {
TransactionBuilder(self.0.clone())
2018-07-14 21:59:37 +00:00
}
2018-07-08 05:42:04 +00:00
pub fn batch_execute(&mut self, query: &str) -> BatchExecute {
BatchExecute(self.0.batch_execute(query))
}
2016-12-20 23:42:28 +00:00
}
2018-06-19 23:54:29 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct Connection<S>(proto::Connection<S>);
2016-12-21 03:50:44 +00:00
impl<S> Connection<S>
where
S: AsyncRead + AsyncWrite,
{
2016-12-20 23:42:28 +00:00
pub fn cancel_data(&self) -> CancelData {
2018-06-17 04:29:27 +00:00
self.0.cancel_data()
2016-12-20 23:42:28 +00:00
}
2016-12-26 21:21:20 +00:00
2018-06-17 04:29:27 +00:00
pub fn parameter(&self, name: &str) -> Option<&str> {
self.0.parameter(name)
}
2018-07-07 17:11:16 +00:00
pub fn poll_message(&mut self) -> Poll<Option<AsyncMessage>, Error> {
self.0.poll_message()
}
2016-12-20 23:42:28 +00:00
}
impl<S> Future for Connection<S>
where
S: AsyncRead + AsyncWrite,
{
2018-06-17 04:29:27 +00:00
type Item = ();
type Error = Error;
2018-06-17 04:29:27 +00:00
fn poll(&mut self) -> Poll<(), Error> {
self.0.poll()
2016-12-21 03:50:44 +00:00
}
}
2018-07-07 17:11:16 +00:00
pub enum AsyncMessage {
Notice(DbError),
Notification(Notification),
#[doc(hidden)]
__NonExhaustive,
}
2018-06-28 05:37:43 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct CancelQuery<S, T>(proto::CancelFuture<S, T>)
where
S: AsyncRead + AsyncWrite,
T: TlsMode<S>;
2018-06-28 05:37:43 +00:00
impl<S, T> Future for CancelQuery<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsMode<S>,
{
2018-06-28 05:37:43 +00:00
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
self.0.poll()
}
}
2018-06-19 23:54:29 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct Connect<S, T>(proto::ConnectFuture<S, T>)
where
S: AsyncRead + AsyncWrite,
T: TlsMode<S>;
2017-09-30 21:56:15 +00:00
impl<S, T> Future for Connect<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsMode<S>,
{
type Item = (Client, Connection<T::Stream>);
2018-06-17 04:29:27 +00:00
type Error = Error;
2017-09-30 21:56:15 +00:00
fn poll(&mut self) -> Poll<(Client, Connection<T::Stream>), Error> {
2018-06-18 12:18:04 +00:00
let (client, connection) = try_ready!(self.0.poll());
2017-09-30 21:56:15 +00:00
2018-06-18 12:18:04 +00:00
Ok(Async::Ready((Client(client), Connection(connection))))
2017-09-30 21:56:15 +00:00
}
}
2018-06-19 02:34:25 +00:00
2018-06-19 23:54:29 +00:00
#[must_use = "futures do nothing unless polled"]
2018-06-19 02:34:25 +00:00
pub struct Prepare(proto::PrepareFuture);
impl Future for Prepare {
type Item = Statement;
type Error = Error;
fn poll(&mut self) -> Poll<Statement, Error> {
let statement = try_ready!(self.0.poll());
Ok(Async::Ready(Statement(statement)))
}
}
pub struct Statement(proto::Statement);
impl Statement {
pub fn params(&self) -> &[Type] {
self.0.params()
}
pub fn columns(&self) -> &[Column] {
self.0.columns()
}
}
2018-06-20 02:10:07 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct Execute(proto::ExecuteFuture);
impl Future for Execute {
type Item = u64;
type Error = Error;
fn poll(&mut self) -> Poll<u64, Error> {
self.0.poll()
}
}
2018-06-21 00:06:11 +00:00
#[must_use = "streams do nothing unless polled"]
2018-08-16 04:00:15 +00:00
pub struct Query(proto::QueryStream<proto::Statement>);
2018-06-21 00:06:11 +00:00
impl Stream for Query {
type Item = Row;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Row>, Error> {
match self.0.poll() {
Ok(Async::Ready(Some(row))) => Ok(Async::Ready(Some(Row(row)))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
2018-08-16 02:53:20 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct Bind(proto::BindFuture);
impl Future for Bind {
type Item = Portal;
type Error = Error;
fn poll(&mut self) -> Poll<Portal, Error> {
match self.0.poll() {
Ok(Async::Ready(portal)) => Ok(Async::Ready(Portal(portal))),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
2018-08-16 04:00:15 +00:00
#[must_use = "streams do nothing unless polled"]
pub struct QueryPortal(proto::QueryStream<proto::Portal>);
impl Stream for QueryPortal {
type Item = Row;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Row>, Error> {
match self.0.poll() {
Ok(Async::Ready(Some(row))) => Ok(Async::Ready(Some(Row(row)))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
2018-08-16 02:53:20 +00:00
pub struct Portal(proto::Portal);
2018-08-11 21:32:17 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct CopyIn<S>(proto::CopyInFuture<S>)
where
2018-08-13 03:23:21 +00:00
S: Stream,
2018-12-03 05:26:10 +00:00
S::Item: IntoBuf,
<S::Item as IntoBuf>::Buf: Send,
2018-12-09 01:40:37 +00:00
S::Error: Into<Box<dyn StdError + Sync + Send>>;
2018-08-11 21:32:17 +00:00
impl<S> Future for CopyIn<S>
where
2018-11-29 03:44:07 +00:00
S: Stream,
2018-12-03 05:26:10 +00:00
S::Item: IntoBuf,
<S::Item as IntoBuf>::Buf: Send,
2018-12-09 01:40:37 +00:00
S::Error: Into<Box<dyn StdError + Sync + Send>>,
2018-08-11 21:32:17 +00:00
{
type Item = u64;
type Error = Error;
fn poll(&mut self) -> Poll<u64, Error> {
self.0.poll()
}
}
2018-07-16 02:40:15 +00:00
#[must_use = "streams do nothing unless polled"]
pub struct CopyOut(proto::CopyOutStream);
impl Stream for CopyOut {
type Item = Bytes;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
self.0.poll()
}
}
2018-06-21 00:06:11 +00:00
pub struct Row(proto::Row);
impl Row {
pub fn columns(&self) -> &[Column] {
self.0.columns()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn get<'a, I, T>(&'a self, idx: I) -> T
where
I: RowIndex + fmt::Debug,
T: FromSql<'a>,
{
self.0.get(idx)
}
pub fn try_get<'a, I, T>(&'a self, idx: I) -> Result<Option<T>, Error>
where
I: RowIndex,
T: FromSql<'a>,
{
self.0.try_get(idx)
}
}
2018-07-08 05:42:04 +00:00
pub struct TransactionBuilder(proto::Client);
impl TransactionBuilder {
pub fn build<T>(self, future: T) -> Transaction<T>
where
T: Future,
// FIXME error type?
T::Error: From<Error>,
{
Transaction(proto::TransactionFuture::new(self.0, future))
}
}
2018-07-14 21:59:37 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct Transaction<T>(proto::TransactionFuture<T, T::Item, T::Error>)
where
T: Future,
T::Error: From<Error>;
impl<T> Future for Transaction<T>
where
T: Future,
T::Error: From<Error>,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<T::Item, T::Error> {
self.0.poll()
}
}
2018-07-08 05:42:04 +00:00
#[must_use = "futures do nothing unless polled"]
pub struct BatchExecute(proto::SimpleQueryFuture);
impl Future for BatchExecute {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
self.0.poll()
}
}
2018-12-09 01:39:20 +00:00
/// Contains information necessary to cancel queries for a session.
#[derive(Copy, Clone, Debug)]
pub struct CancelData {
/// The process ID of the session.
pub process_id: i32,
/// The secret key for the session.
pub secret_key: i32,
}
/// An asynchronous notification.
#[derive(Clone, Debug)]
pub struct Notification {
/// The process ID of the notifying backend process.
pub process_id: i32,
/// The name of the channel that the notify has been raised on.
pub channel: String,
/// The "payload" string passed from the notifying process.
pub payload: String,
}