rust-postgres/postgres/src/client.rs

293 lines
11 KiB
Rust
Raw Normal View History

use fallible_iterator::FallibleIterator;
2018-12-23 23:02:42 +00:00
use futures::{Async, Future, Poll, Stream};
2018-12-29 05:01:10 +00:00
use std::io::{self, Read};
2019-03-05 05:55:37 +00:00
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
2018-12-21 21:34:09 +00:00
use tokio_postgres::types::{ToSql, Type};
#[cfg(feature = "runtime")]
2019-03-05 05:55:37 +00:00
use tokio_postgres::Socket;
use tokio_postgres::{Error, Row, SimpleQueryMessage};
2018-12-21 21:34:09 +00:00
#[cfg(feature = "runtime")]
2018-12-30 05:00:58 +00:00
use crate::Config;
use crate::{CopyOutReader, QueryIter, SimpleQueryIter, Statement, ToStatement, Transaction};
2018-12-21 21:34:09 +00:00
2019-03-28 03:20:15 +00:00
/// A synchronous PostgreSQL client.
///
/// This is a lightweight wrapper over the asynchronous tokio_postgres `Client`.
2018-12-21 21:34:09 +00:00
pub struct Client(tokio_postgres::Client);
impl Client {
2019-03-28 03:20:15 +00:00
/// A convenience function which parses a configuration string into a `Config` and then connects to the database.
///
/// Requires the `runtime` Cargo feature (enabled by default).
2018-12-21 21:34:09 +00:00
#[cfg(feature = "runtime")]
pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
where
2019-01-13 22:53:19 +00:00
T: MakeTlsConnect<Socket> + 'static + Send,
T::TlsConnect: Send,
2018-12-21 21:34:09 +00:00
T::Stream: Send,
2019-01-13 22:53:19 +00:00
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
2018-12-21 21:34:09 +00:00
{
2018-12-30 05:00:58 +00:00
params.parse::<Config>()?.connect(tls_mode)
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Returns a new `Config` object which can be used to configure and connect to a database.
///
/// Requires the `runtime` Cargo feature (enabled by default).
2018-12-21 21:34:09 +00:00
#[cfg(feature = "runtime")]
pub fn configure() -> Config {
2018-12-30 05:00:58 +00:00
Config::new()
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Creates a new prepared statement.
///
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
/// which are set when executed. Prepared statements can only be used with the connection that created them.
2018-12-21 21:34:09 +00:00
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
2019-03-26 04:03:22 +00:00
self.0.prepare(query).wait()
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
///
/// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
/// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
2018-12-21 21:34:09 +00:00
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
2019-03-26 04:03:22 +00:00
self.0.prepare_typed(query, types).wait()
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Executes a statement, returning the number of rows modified.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
///
/// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// # Example
///
/// ```no_run
/// use postgres::{Client, NoTls};
///
/// # fn main() -> Result<(), postgres::Error> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let bar = 1i32;
/// let baz = true;
/// let rows_updated = client.execute(
/// "UPDATE foo SET bar = $1 WHERE baz = $2",
/// &[&bar, &baz],
/// )?;
///
/// println!("{} rows updated", rows_updated);
/// # Ok(())
/// # }
/// ```
pub fn execute<T>(&mut self, query: &T, params: &[&dyn ToSql]) -> Result<u64, Error>
where
2018-12-29 04:20:31 +00:00
T: ?Sized + ToStatement,
{
let statement = query.__statement(self)?;
2019-03-26 04:03:22 +00:00
self.0.execute(&statement, params).wait()
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Executes a statement, returning the resulting rows.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// The `query_iter` method can be used to avoid buffering all rows in memory at once.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// # Examples
///
/// ```no_run
/// use postgres::{Client, NoTls};
///
/// # fn main() -> Result<(), postgres::Error> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let baz = true;
/// for row in client.query("SELECT foo FROM bar WHERE baz = $1", &[&baz])? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
/// }
/// # Ok(())
/// # }
/// ```
pub fn query<T>(&mut self, query: &T, params: &[&dyn ToSql]) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.query_iter(query, params)?.collect()
}
2019-03-28 03:20:15 +00:00
/// Like `query`, except that it returns a fallible iterator over the resulting rows rather than buffering the
/// response in memory.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// # Examples
///
/// ```no_run
/// use postgres::{Client, NoTls};
/// use fallible_iterator::FallibleIterator;
///
/// # fn main() -> Result<(), postgres::Error> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let baz = true;
/// let mut it = client.query_iter("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
///
/// while let Some(row) = it.next()? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
/// }
/// # Ok(())
/// # }
pub fn query_iter<T>(
&mut self,
query: &T,
params: &[&dyn ToSql],
) -> Result<QueryIter<'_>, Error>
where
2018-12-29 04:20:31 +00:00
T: ?Sized + ToStatement,
{
let statement = query.__statement(self)?;
2019-03-26 04:03:22 +00:00
Ok(QueryIter::new(self.0.query(&statement, params)))
2018-12-21 21:34:09 +00:00
}
2019-03-28 03:20:15 +00:00
/// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
///
/// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
/// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
2018-12-23 05:42:03 +00:00
pub fn copy_in<T, R>(
&mut self,
query: &T,
params: &[&dyn ToSql],
reader: R,
) -> Result<u64, Error>
where
2018-12-29 04:20:31 +00:00
T: ?Sized + ToStatement,
2018-12-23 05:42:03 +00:00
R: Read,
{
let statement = query.__statement(self)?;
2018-12-23 23:02:42 +00:00
self.0
2019-03-26 04:03:22 +00:00
.copy_in(&statement, params, CopyInStream(reader))
2018-12-23 23:02:42 +00:00
.wait()
2018-12-23 05:42:03 +00:00
}
2019-03-28 03:20:15 +00:00
/// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
///
/// The `query` argument can either be a `Statement`, or a raw query string.
2018-12-23 21:08:02 +00:00
pub fn copy_out<T>(
&mut self,
query: &T,
params: &[&dyn ToSql],
) -> Result<CopyOutReader<'_>, Error>
where
2018-12-29 04:20:31 +00:00
T: ?Sized + ToStatement,
2018-12-23 21:08:02 +00:00
{
let statement = query.__statement(self)?;
2019-03-26 04:03:22 +00:00
let stream = self.0.copy_out(&statement, params);
2018-12-29 05:01:10 +00:00
CopyOutReader::new(stream)
2018-12-23 21:08:02 +00:00
}
2019-03-28 03:20:15 +00:00
/// Executes a sequence of SQL statements using the simple query protocol.
///
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
/// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
/// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
/// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
/// data. This preserves the framing between the separate statements in the request.
///
/// This is a simple convenience method over `simple_query_iter`.
///
/// # Warning
///
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.simple_query_iter(query)?.collect()
}
2019-03-28 03:20:15 +00:00
/// Executes a sequence of SQL statements using the simple query protocol.
///
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
/// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
/// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
/// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
/// data. This preserves the framing between the separate statements in the request.
///
/// # Warning
///
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub fn simple_query_iter(&mut self, query: &str) -> Result<SimpleQueryIter<'_>, Error> {
Ok(SimpleQueryIter::new(self.0.simple_query(query)))
2018-12-21 21:34:09 +00:00
}
2018-12-21 21:46:50 +00:00
2019-03-28 03:20:15 +00:00
/// Begins a new database transaction.
2018-12-21 21:46:50 +00:00
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
self.simple_query("BEGIN")?;
2018-12-21 21:46:50 +00:00
Ok(Transaction::new(self))
}
2018-12-22 05:08:26 +00:00
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
2018-12-23 23:58:39 +00:00
pub fn get_ref(&self) -> &tokio_postgres::Client {
&self.0
}
pub fn get_mut(&mut self) -> &mut tokio_postgres::Client {
&mut self.0
}
pub fn into_inner(self) -> tokio_postgres::Client {
self.0
}
2018-12-21 21:34:09 +00:00
}
impl From<tokio_postgres::Client> for Client {
fn from(c: tokio_postgres::Client) -> Client {
Client(c)
}
}
2018-12-23 05:42:03 +00:00
2018-12-23 23:02:42 +00:00
struct CopyInStream<R>(R);
2018-12-23 05:42:03 +00:00
2018-12-23 23:02:42 +00:00
impl<R> Stream for CopyInStream<R>
where
R: Read,
{
2018-12-23 05:42:03 +00:00
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Vec<u8>>, io::Error> {
2018-12-23 23:02:42 +00:00
let mut buf = vec![];
match self.0.by_ref().take(4096).read_to_end(&mut buf)? {
0 => Ok(Async::Ready(None)),
_ => Ok(Async::Ready(Some(buf))),
2018-12-23 05:42:03 +00:00
}
}
}