rust-postgres/tokio-postgres/src/lib.rs

1222 lines
44 KiB
Rust
Raw Normal View History

2016-12-26 21:36:34 +00:00
//! An asynchronous Postgres driver using Tokio.
2016-12-26 22:19:43 +00:00
//!
//! # Example
//!
//! ```rust,no_run
//! extern crate futures;
//! extern crate futures_state_stream;
//! extern crate tokio_core;
//! extern crate tokio_postgres;
//!
//! use futures::Future;
//! use futures_state_stream::StateStream;
//! use tokio_core::reactor::Core;
//! use tokio_postgres::{Connection, TlsMode};
//!
//! struct Person {
//! id: i32,
//! name: String,
//! data: Option<Vec<u8>>
//! }
//!
//! fn main() {
//! let mut l = Core::new().unwrap();
2017-07-09 02:40:03 +00:00
//! let done = Connection::connect("postgresql://postgres@localhost:5433",
2017-02-24 04:27:29 +00:00
//! TlsMode::None,
//! &l.handle())
2016-12-26 22:19:43 +00:00
//! .then(|c| {
//! c.unwrap()
//! .batch_execute("CREATE TABLE person (
//! id SERIAL PRIMARY KEY,
//! name VARCHAR NOT NULL,
//! data BYTEA
//! )")
//! })
//! .and_then(|c| c.prepare("INSERT INTO person (name, data) VALUES ($1, $2)"))
//! .and_then(|(s, c)| c.execute(&s, &[&"Steven", &None::<Vec<u8>>]))
//! .and_then(|(_, c)| c.prepare("SELECT id, name, data FROM person"))
//! .and_then(|(s, c)| {
//! c.query(&s, &[])
//! .for_each(|row| {
//! let person = Person {
//! id: row.get(0),
//! name: row.get(1),
//! data: row.get(2),
//! };
//! println!("Found person {}", person.name);
//! })
//! });
//!
//! l.run(done).unwrap();
//! }
//! ```
2018-01-10 05:15:35 +00:00
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.3")]
2016-12-26 21:21:20 +00:00
#![warn(missing_docs)]
2017-04-23 22:39:07 +00:00
extern crate bytes;
2016-12-21 03:50:44 +00:00
extern crate fallible_iterator;
2016-12-23 03:10:45 +00:00
extern crate futures_state_stream;
2016-12-20 23:42:28 +00:00
extern crate postgres_protocol;
2018-01-10 05:15:35 +00:00
extern crate postgres_shared;
2016-12-20 23:42:28 +00:00
extern crate tokio_core;
extern crate tokio_dns;
2017-04-23 22:39:07 +00:00
extern crate tokio_io;
#[macro_use]
extern crate futures;
#[cfg(unix)]
2016-12-20 23:42:28 +00:00
extern crate tokio_uds;
2016-12-21 03:50:44 +00:00
use fallible_iterator::FallibleIterator;
2016-12-20 23:42:28 +00:00
use futures::future::Either;
2018-04-15 21:38:01 +00:00
use futures::{Async, Future, IntoFuture, Poll, Sink, StartSend, Stream};
2018-01-10 05:15:35 +00:00
use futures_state_stream::{FutureExt, StateStream, StreamEvent};
2016-12-20 23:42:28 +00:00
use postgres_protocol::authentication;
2018-01-10 05:15:35 +00:00
use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody};
2018-04-15 21:38:01 +00:00
use postgres_protocol::message::{backend, frontend};
2016-12-26 21:29:30 +00:00
use postgres_shared::rows::RowData;
use std::collections::{HashMap, VecDeque};
2016-12-21 03:50:44 +00:00
use std::fmt;
2016-12-20 23:42:28 +00:00
use std::io;
2018-01-10 05:15:35 +00:00
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::mpsc::{self, Receiver, Sender};
2018-04-15 21:38:01 +00:00
use std::sync::Arc;
2016-12-20 23:42:28 +00:00
use tokio_core::reactor::Handle;
2017-07-20 04:22:27 +00:00
#[doc(inline)]
pub use error::Error;
2018-04-15 21:38:01 +00:00
#[doc(inline)]
pub use postgres_shared::{error, params, types, CancelData, Notification};
2016-12-21 00:10:53 +00:00
2018-04-22 21:55:20 +00:00
use error::{DbError, SqlState};
2016-12-21 00:10:53 +00:00
use params::{ConnectParams, IntoConnectParams};
2018-04-15 21:38:01 +00:00
use rows::Row;
2017-07-20 04:22:27 +00:00
use sink::SinkExt;
2018-01-10 05:15:35 +00:00
use stmt::{Column, Statement};
2016-12-20 23:42:28 +00:00
use stream::PostgresStream;
2016-12-24 17:21:26 +00:00
use tls::Handshake;
2016-12-26 20:43:08 +00:00
use transaction::Transaction;
2018-04-22 21:55:20 +00:00
use types::{Field, FromSql, IsNull, Kind, Oid, ToSql, Type};
2016-12-20 23:42:28 +00:00
2017-07-23 22:43:05 +00:00
#[macro_use]
mod macros;
2016-12-26 20:50:36 +00:00
pub mod rows;
2017-07-20 04:22:27 +00:00
mod sink;
2018-04-15 21:38:01 +00:00
pub mod stmt;
2016-12-20 23:42:28 +00:00
mod stream;
2016-12-24 17:21:26 +00:00
pub mod tls;
2016-12-26 20:43:08 +00:00
pub mod transaction;
2016-12-20 23:42:28 +00:00
#[cfg(test)]
mod test;
2016-12-25 17:51:12 +00:00
const TYPEINFO_QUERY: &'static str = "__typeinfo";
const TYPEINFO_ENUM_QUERY: &'static str = "__typeinfo_enum";
const TYPEINFO_COMPOSITE_QUERY: &'static str = "__typeinfo_composite";
static NEXT_STMT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
2016-12-26 21:21:20 +00:00
/// Specifies the TLS support required for a new connection.
2016-12-24 17:21:26 +00:00
pub enum TlsMode {
2016-12-26 21:21:20 +00:00
/// The connection must use TLS.
2016-12-24 17:21:26 +00:00
Require(Box<Handshake>),
2016-12-26 21:21:20 +00:00
/// The connection will use TLS if available.
2016-12-24 17:21:26 +00:00
Prefer(Box<Handshake>),
2016-12-26 21:21:20 +00:00
/// The connection will not use TLS.
2016-12-24 17:21:26 +00:00
None,
}
2016-12-26 21:21:20 +00:00
/// Attempts to cancel an in-progress query.
///
/// The backend provides no information about whether a cancellation attempt
/// was successful or not. An error will only be returned if the driver was
/// unable to connect to the database.
///
/// A `CancelData` object can be created via `Connection::cancel_data`. The
/// object can cancel any query made on that connection.
///
/// Only the host and port of the connection info are used. See
/// `Connection::connect` for details of the `params` argument.
2017-07-01 03:35:17 +00:00
pub fn cancel_query<T>(
params: T,
tls_mode: TlsMode,
cancel_data: CancelData,
handle: &Handle,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (), Error = Error> + Send>
2017-07-01 03:35:17 +00:00
where
T: IntoConnectParams,
2016-12-24 17:55:15 +00:00
{
let params = match params.into_connect_params() {
2018-01-10 05:15:35 +00:00
Ok(params) => Either::A(stream::connect(
params.host().clone(),
params.port(),
2018-04-15 21:38:01 +00:00
params.keepalive(),
2018-01-10 05:15:35 +00:00
tls_mode,
handle,
)),
2017-07-20 04:22:27 +00:00
Err(e) => Either::B(Err(error::connect(e)).into_future()),
2016-12-24 17:55:15 +00:00
};
2017-07-01 03:35:17 +00:00
params
.and_then(move |c| {
2016-12-24 17:55:15 +00:00
let mut buf = vec![];
frontend::cancel_request(cancel_data.process_id, cancel_data.secret_key, &mut buf);
2017-07-20 04:22:27 +00:00
c.send(buf).map_err(error::io)
2016-12-24 17:55:15 +00:00
})
.map(|_| ())
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-24 17:55:15 +00:00
}
2016-12-20 23:42:28 +00:00
struct InnerConnection {
stream: PostgresStream,
2016-12-22 02:51:47 +00:00
close_receiver: Receiver<(u8, String)>,
close_sender: Sender<(u8, String)>,
2016-12-21 00:22:35 +00:00
parameters: HashMap<String, String>,
2017-07-09 07:02:45 +00:00
types: HashMap<Oid, Type>,
notifications: VecDeque<Notification>,
2016-12-21 00:22:35 +00:00
cancel_data: CancelData,
2016-12-25 17:51:12 +00:00
has_typeinfo_query: bool,
has_typeinfo_enum_query: bool,
has_typeinfo_composite_query: bool,
2017-07-20 04:22:27 +00:00
desynchronized: bool,
2016-12-20 23:42:28 +00:00
}
impl InnerConnection {
2017-09-30 22:14:02 +00:00
fn read(
self,
) -> Box<
2018-01-10 05:15:35 +00:00
Future<Item = (backend::Message, InnerConnection), Error = (io::Error, InnerConnection)>
2017-09-30 22:14:02 +00:00
+ Send,
> {
2017-07-20 04:22:27 +00:00
if self.desynchronized {
let e = io::Error::new(
io::ErrorKind::Other,
"connection desynchronized due to earlier IO error",
);
2017-09-30 21:56:15 +00:00
return Err((e, self)).into_future().boxed2();
2017-07-20 04:22:27 +00:00
}
2016-12-20 23:42:28 +00:00
self.into_future()
2017-07-01 03:35:17 +00:00
.and_then(|(m, mut s)| match m {
Some(backend::Message::NotificationResponse(body)) => {
let process_id = body.process_id();
let channel = match body.channel() {
Ok(channel) => channel.to_owned(),
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((e, s)).into_future()),
2017-07-01 03:35:17 +00:00
};
let message = match body.message() {
Ok(channel) => channel.to_owned(),
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((e, s)).into_future()),
2017-07-01 03:35:17 +00:00
};
let notification = Notification {
process_id: process_id,
channel: channel,
payload: message,
};
s.notifications.push_back(notification);
Either::B(s.read())
}
Some(m) => Either::A(Ok((m, s)).into_future()),
None => {
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF");
2017-07-20 04:22:27 +00:00
Either::A(Err((err, s)).into_future())
2016-12-20 23:42:28 +00:00
}
})
2017-07-20 04:22:27 +00:00
.map_err(|(e, mut s)| {
s.desynchronized = true;
(e, s)
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
}
impl Stream for InnerConnection {
type Item = backend::Message;
2016-12-20 23:42:28 +00:00
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<backend::Message>, io::Error> {
loop {
match try_ready!(self.stream.poll()) {
Some(backend::Message::ParameterStatus(body)) => {
let name = body.name()?.to_owned();
let value = body.value()?.to_owned();
self.parameters.insert(name, value);
}
// TODO forward to a handler
Some(backend::Message::NoticeResponse(_)) => {}
msg => return Ok(Async::Ready(msg)),
}
}
2016-12-20 23:42:28 +00:00
}
}
impl Sink for InnerConnection {
type SinkItem = Vec<u8>;
type SinkError = io::Error;
fn start_send(&mut self, item: Vec<u8>) -> StartSend<Vec<u8>, io::Error> {
self.stream.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.stream.poll_complete()
}
}
2016-12-26 21:21:20 +00:00
/// A connection to a Postgres database.
2016-12-20 23:42:28 +00:00
pub struct Connection(InnerConnection);
2016-12-21 03:50:44 +00:00
// FIXME fill out
impl fmt::Debug for Connection {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
2017-07-01 03:35:17 +00:00
fmt.debug_struct("Connection").finish()
2016-12-21 03:50:44 +00:00
}
}
2016-12-20 23:42:28 +00:00
impl Connection {
2016-12-26 21:21:20 +00:00
/// Creates a new connection to a Postgres database.
///
/// Most applications can use a URL string in the normal format:
///
/// ```notrust
/// postgresql://user[:password]@host[:port][/database][?param1=val1[[&param2=val2]...]]
/// ```
///
/// The password may be omitted if not required. The default Postgres port
/// (5432) is used if none is specified. The database name defaults to the
/// username if not specified.
///
/// To connect to the server via Unix sockets, `host` should be set to the
/// absolute path of the directory containing the socket file. Since `/` is
/// a reserved character in URLs, the path should be URL encoded. If the
/// path contains non-UTF 8 characters, a `ConnectParams` struct should be
/// created manually and passed in. Note that Postgres does not support TLS
/// over Unix sockets.
2017-09-30 22:14:02 +00:00
pub fn connect<T>(
params: T,
tls_mode: TlsMode,
handle: &Handle,
) -> Box<Future<Item = Connection, Error = Error> + Send>
2017-07-01 03:35:17 +00:00
where
T: IntoConnectParams,
2016-12-20 23:42:28 +00:00
{
2016-12-24 22:59:48 +00:00
let fut = match params.into_connect_params() {
2018-01-10 05:15:35 +00:00
Ok(params) => Either::A(
2018-04-15 21:38:01 +00:00
stream::connect(
params.host().clone(),
params.port(),
params.keepalive(),
tls_mode,
handle,
).map(|s| (s, params)),
2018-01-10 05:15:35 +00:00
),
2017-07-20 04:22:27 +00:00
Err(e) => Either::B(Err(error::connect(e)).into_future()),
2016-12-20 23:42:28 +00:00
};
2016-12-24 22:59:48 +00:00
fut.map(|(s, params)| {
2017-07-01 03:35:17 +00:00
let (sender, receiver) = mpsc::channel();
(
Connection(InnerConnection {
stream: s,
close_sender: sender,
close_receiver: receiver,
parameters: HashMap::new(),
types: HashMap::new(),
notifications: VecDeque::new(),
cancel_data: CancelData {
process_id: 0,
secret_key: 0,
},
has_typeinfo_query: false,
has_typeinfo_enum_query: false,
has_typeinfo_composite_query: false,
2017-07-20 04:22:27 +00:00
desynchronized: false,
2017-07-01 03:35:17 +00:00
}),
params,
)
}).and_then(|(s, params)| s.startup(params))
2016-12-20 23:42:28 +00:00
.and_then(|(s, params)| s.handle_auth(params))
.and_then(|s| s.finish_startup())
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
2017-09-30 22:14:02 +00:00
fn startup(
self,
params: ConnectParams,
) -> Box<Future<Item = (Connection, ConnectParams), Error = Error> + Send> {
2016-12-20 23:42:28 +00:00
let mut buf = vec![];
let result = {
let options = [("client_encoding", "UTF8"), ("timezone", "GMT")];
let options = options.iter().cloned();
2017-02-16 05:42:27 +00:00
let options = options.chain(params.user().map(|u| ("user", u.name())));
let options = options.chain(params.database().map(|d| ("database", d)));
let options = options.chain(params.options().iter().map(|e| (&*e.0, &*e.1)));
2016-12-20 23:42:28 +00:00
frontend::startup_message(options, &mut buf)
};
result
.into_future()
.and_then(move |()| self.0.send(buf))
2017-07-20 04:22:27 +00:00
.map_err(error::io)
2016-12-20 23:42:28 +00:00
.map(move |s| (Connection(s), params))
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
2017-09-30 22:14:02 +00:00
fn handle_auth(
self,
params: ConnectParams,
) -> Box<Future<Item = Connection, Error = Error> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, _)| error::io(e))
2016-12-20 23:42:28 +00:00
.and_then(move |(m, s)| {
let response = match m {
backend::Message::AuthenticationOk => Ok(None),
backend::Message::AuthenticationCleartextPassword => {
2017-02-16 05:42:27 +00:00
match params.user().and_then(|u| u.password()) {
2016-12-20 23:42:28 +00:00
Some(pass) => {
let mut buf = vec![];
frontend::password_message(pass, &mut buf)
.map(|()| Some(buf))
.map_err(Into::into)
}
2018-01-10 05:15:35 +00:00
None => Err(error::connect(
"password was required but not provided".into(),
)),
2016-12-20 23:42:28 +00:00
}
}
2018-01-10 05:15:35 +00:00
backend::Message::AuthenticationMd5Password(body) => match params
.user()
.and_then(|u| u.password().map(|p| (u.name(), p)))
{
Some((user, pass)) => {
let pass = authentication::md5_hash(
user.as_bytes(),
pass.as_bytes(),
body.salt(),
);
let mut buf = vec![];
frontend::password_message(&pass, &mut buf)
.map(|()| Some(buf))
.map_err(Into::into)
2016-12-20 23:42:28 +00:00
}
2018-01-10 05:15:35 +00:00
None => Err(error::connect(
"password was required but not provided".into(),
)),
},
2017-07-20 04:22:27 +00:00
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
2016-12-20 23:42:28 +00:00
_ => Err(bad_message()),
};
response.map(|m| (m, Connection(s)))
})
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
Some(m) => Either::A(s.handle_auth_response(m)),
None => Either::B(Ok(s).into_future()),
2016-12-20 23:42:28 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
2017-09-30 22:14:02 +00:00
fn handle_auth_response(
self,
message: Vec<u8>,
) -> Box<Future<Item = Connection, Error = Error> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.send(message)
2017-07-20 04:22:27 +00:00
.and_then(|s| s.read().map_err(|(e, _)| e))
.map_err(error::io)
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
backend::Message::AuthenticationOk => Ok(Connection(s)),
2017-07-20 04:22:27 +00:00
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
2017-02-24 04:27:29 +00:00
_ => Err(bad_message()),
2016-12-20 23:42:28 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
2017-09-30 22:14:02 +00:00
fn finish_startup(self) -> Box<Future<Item = Connection, Error = Error> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, _)| error::io(e))
2017-02-24 04:27:29 +00:00
.and_then(|(m, mut s)| match m {
backend::Message::BackendKeyData(body) => {
s.cancel_data.process_id = body.process_id();
s.cancel_data.secret_key = body.secret_key();
Either::A(Connection(s).finish_startup())
2016-12-20 23:42:28 +00:00
}
2017-02-24 04:27:29 +00:00
backend::Message::ReadyForQuery(_) => Either::B(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => {
2017-07-20 04:22:27 +00:00
Either::B(Err(err(&mut body.fields())).into_future())
2017-02-24 04:27:29 +00:00
}
_ => Either::B(Err(bad_message()).into_future()),
2016-12-20 23:42:28 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-20 23:42:28 +00:00
}
2017-07-20 04:22:27 +00:00
fn simple_query(
self,
query: &str,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (Vec<RowData>, Connection), Error = (Error, Connection)> + Send> {
2016-12-21 03:50:44 +00:00
let mut buf = vec![];
2017-07-20 04:22:27 +00:00
if let Err(e) = frontend::query(query, &mut buf) {
2017-09-30 21:56:15 +00:00
return Err((error::io(e), self)).into_future().boxed2();
2017-07-20 04:22:27 +00:00
}
self.0
.send2(buf)
.map_err(|(e, s)| (error::io(e), Connection(s)))
2016-12-21 03:50:44 +00:00
.and_then(|s| Connection(s).simple_read_rows(vec![]))
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 03:50:44 +00:00
}
// This has its own read_rows since it will need to handle multiple query completions
2017-07-01 03:35:17 +00:00
fn simple_read_rows(
self,
mut rows: Vec<RowData>,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (Vec<RowData>, Connection), Error = (Error, Connection)> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => {
2017-09-30 21:56:15 +00:00
Ok((rows, Connection(s))).into_future().boxed2()
2017-02-24 04:27:29 +00:00
}
2018-01-10 05:15:35 +00:00
backend::Message::DataRow(body) => match RowData::new(body) {
Ok(row) => {
rows.push(row);
Connection(s).simple_read_rows(rows)
2016-12-21 03:50:44 +00:00
}
2018-01-10 05:15:35 +00:00
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
},
backend::Message::EmptyQueryResponse
| backend::Message::CommandComplete(_)
| backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
2017-02-24 04:27:29 +00:00
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
2017-09-30 21:56:15 +00:00
_ => Err((bad_message(), Connection(s))).into_future().boxed2(),
2016-12-21 03:50:44 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 03:50:44 +00:00
}
2017-09-30 22:14:02 +00:00
fn ready<T>(
self,
t: T,
) -> Box<Future<Item = (T, Connection), Error = (Error, Connection)> + Send>
2017-07-01 03:35:17 +00:00
where
T: 'static + Send,
2016-12-21 03:50:44 +00:00
{
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
2017-07-20 04:22:27 +00:00
backend::Message::ReadyForQuery(_) => Ok(s),
_ => Err((bad_message(), Connection(s))),
2016-12-21 03:50:44 +00:00
})
2017-07-20 04:22:27 +00:00
.and_then(|s| Connection(s).close_gc().map(|s| (t, s)))
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 03:50:44 +00:00
}
2017-09-30 22:14:02 +00:00
fn close_gc(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2016-12-22 02:54:11 +00:00
let mut messages = vec![];
while let Ok((type_, name)) = self.0.close_receiver.try_recv() {
let mut buf = vec![];
frontend::close(type_, &name, &mut buf).unwrap(); // this can only fail on bad names
messages.push(buf);
}
if messages.is_empty() {
2017-09-30 21:56:15 +00:00
return Ok(self).into_future().boxed2();
2016-12-22 02:54:11 +00:00
}
let mut buf = vec![];
frontend::sync(&mut buf);
messages.push(buf);
2017-02-24 04:27:29 +00:00
self.0
2017-09-30 22:14:02 +00:00
.send_all2(futures::stream::iter_ok::<_, Error>(messages))
2017-07-20 04:22:27 +00:00
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
2016-12-22 02:54:11 +00:00
.and_then(|s| Connection(s.0).finish_close_gc())
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-22 02:54:11 +00:00
}
2017-09-30 22:14:02 +00:00
fn finish_close_gc(self) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
backend::Message::ReadyForQuery(_) => Either::A(Ok(Connection(s)).into_future()),
backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()),
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
2017-07-20 04:22:27 +00:00
_ => Either::A(Err((bad_message(), Connection(s))).into_future()),
2016-12-22 02:54:11 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-22 02:54:11 +00:00
}
2017-09-30 22:14:02 +00:00
fn ready_err<T>(
self,
body: ErrorResponseBody,
) -> Box<Future<Item = T, Error = (Error, Connection)> + Send>
2017-07-01 03:35:17 +00:00
where
T: 'static + Send,
{
2017-07-20 04:22:27 +00:00
let e = match DbError::new(&mut body.fields()) {
Ok(e) => e,
2017-09-30 21:56:15 +00:00
Err(e) => return Err((error::io(e), self)).into_future().boxed2(),
2017-07-20 04:22:27 +00:00
};
self.ready(e)
.and_then(|(e, s)| Err((error::db(e), s)))
2017-09-30 21:56:15 +00:00
.boxed2()
}
2016-12-26 21:21:20 +00:00
/// Execute a sequence of SQL statements.
///
/// Statements should be separated by `;` characters. If an error occurs,
/// execution of the sequence will stop at that point. This is intended for
/// execution of batches of non-dynamic statements - for example, creation
/// of a schema for a fresh database.
///
/// # Warning
///
/// Prepared statements should be used for any SQL statement which contains
/// user-specified data, as it provides functionality to safely embed that
/// data in the statement. Do not form statements via string concatenation
/// and feed them into this method.
2017-09-30 22:14:02 +00:00
pub fn batch_execute(
self,
query: &str,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2017-09-30 21:56:15 +00:00
self.simple_query(query).map(|r| r.1).boxed2()
2016-12-21 03:50:44 +00:00
}
2017-07-01 03:35:17 +00:00
fn raw_prepare(
self,
name: &str,
query: &str,
2018-01-10 05:15:35 +00:00
) -> Box<Future<Item = (Vec<Type>, Vec<Column>, Connection), Error = (Error, Connection)> + Send>
{
2016-12-21 19:16:43 +00:00
let mut parse = vec![];
let mut describe = vec![];
let mut sync = vec![];
2016-12-22 02:51:47 +00:00
frontend::sync(&mut sync);
2018-01-10 05:15:35 +00:00
if let Err(e) = frontend::parse(name, query, None, &mut parse)
.and_then(|()| frontend::describe(b'S', name, &mut describe))
{
2017-09-30 21:56:15 +00:00
return Err((error::io(e), self)).into_future().boxed2();
2017-07-20 04:22:27 +00:00
}
2018-01-10 05:15:35 +00:00
let it = Some(parse)
.into_iter()
.chain(Some(describe))
.chain(Some(sync));
2017-09-30 22:14:02 +00:00
self.0.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
2017-07-20 04:22:27 +00:00
.map_err(|(e, s, _)| (e, s))
2016-12-22 01:34:19 +00:00
.and_then(|s| s.0.read())
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2017-09-30 21:56:15 +00:00
.boxed2() // work around nonlinear trans blowup
2016-12-21 19:16:43 +00:00
.and_then(|(m, s)| {
match m {
backend::Message::ParseComplete => Either::A(Ok(s).into_future()),
backend::Message::ErrorResponse(body) => {
Either::B(Connection(s).ready_err(body))
}
2017-07-20 04:22:27 +00:00
_ => Either::A(Err((bad_message(), Connection(s))).into_future()),
2016-12-21 19:16:43 +00:00
}
})
2017-07-20 04:22:27 +00:00
.and_then(|s| s.read().map_err(|(e, s)| (error::io(e), Connection(s))))
2016-12-21 19:16:43 +00:00
.and_then(|(m, s)| {
match m {
backend::Message::ParameterDescription(body) => {
2017-07-20 04:22:27 +00:00
match body.parameters().collect::<Vec<_>>() {
Ok(p) => Ok((p, s)),
Err(e) => Err((error::io(e), Connection(s))),
}
2016-12-21 19:16:43 +00:00
}
2017-07-20 04:22:27 +00:00
_ => Err((bad_message(), Connection(s))),
2016-12-21 19:16:43 +00:00
}
})
2017-07-20 04:22:27 +00:00
.and_then(|(p, s)| s.read().map(|(m, s)| (p, m, s)).map_err(|(e, s)| (error::io(e), Connection(s))))
2017-09-30 21:56:15 +00:00
.boxed2() // work around nonlinear trans blowup
2016-12-21 19:16:43 +00:00
.and_then(|(p, m, s)| {
match m {
backend::Message::RowDescription(body) => {
2017-07-20 04:22:27 +00:00
match body.fields()
2016-12-21 19:16:43 +00:00
.map(|f| (f.name().to_owned(), f.type_oid()))
2017-07-20 04:22:27 +00:00
.collect::<Vec<_>>() {
Ok(d) => Ok((p, d, s)),
Err(e) => Err((error::io(e), Connection(s))),
}
2016-12-21 19:16:43 +00:00
}
backend::Message::NoData => Ok((p, vec![], s)),
2017-07-20 04:22:27 +00:00
_ => Err((bad_message(), Connection(s))),
2016-12-21 19:16:43 +00:00
}
})
.and_then(|(p, r, s)| Connection(s).ready((p, r)))
.and_then(|((p, r), s)| {
s.get_types(p.into_iter(), vec![], |&p| p, |_, t| t)
.map(|(p, s)| (p, r, s))
})
.and_then(|(p, r, s)| {
s.get_types(r.into_iter(),
vec![],
|f| f.1,
2016-12-22 20:30:03 +00:00
|f, t| Column::new(f.0, t))
2016-12-21 19:16:43 +00:00
.map(|(r, s)| (p, r, s))
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 19:16:43 +00:00
}
2017-07-01 03:35:17 +00:00
fn get_types<T, U, I, F, G>(
self,
mut raw: I,
mut out: Vec<U>,
mut get_oid: F,
mut build: G,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (Vec<U>, Connection), Error = (Error, Connection)> + Send>
2017-07-01 03:35:17 +00:00
where
T: 'static + Send,
U: 'static + Send,
I: 'static + Send + Iterator<Item = T>,
F: 'static + Send + FnMut(&T) -> Oid,
G: 'static + Send + FnMut(T, Type) -> U,
2016-12-21 19:16:43 +00:00
{
match raw.next() {
Some(v) => {
let oid = get_oid(&v);
self.get_type(oid)
.and_then(move |(ty, s)| {
out.push(build(v, ty));
s.get_types(raw, out, get_oid, build)
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 19:16:43 +00:00
}
2017-09-30 21:56:15 +00:00
None => Ok((out, self)).into_future().boxed2(),
2016-12-21 19:16:43 +00:00
}
}
2017-09-30 22:14:02 +00:00
fn get_type(
self,
oid: Oid,
) -> Box<Future<Item = (Type, Connection), Error = (Error, Connection)> + Send> {
2016-12-21 19:16:43 +00:00
if let Some(type_) = Type::from_oid(oid) {
2017-09-30 21:56:15 +00:00
return Ok((type_, self)).into_future().boxed2();
2016-12-21 19:16:43 +00:00
};
2016-12-25 17:51:12 +00:00
2017-07-09 07:02:45 +00:00
let ty = self.0.types.get(&oid).map(Clone::clone);
if let Some(ty) = ty {
2017-09-30 21:56:15 +00:00
return Ok((ty, self)).into_future().boxed2();
2016-12-25 17:51:12 +00:00
}
self.get_unknown_type(oid)
.map(move |(ty, mut c)| {
c.0.types.insert(oid, ty.clone());
2017-07-09 07:02:45 +00:00
(ty, c)
2016-12-25 17:51:12 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 22:14:02 +00:00
fn get_unknown_type(
self,
oid: Oid,
) -> Box<Future<Item = (Type, Connection), Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
self.setup_typeinfo_query()
2018-04-22 21:55:20 +00:00
.and_then(move |c| c.raw_execute(TYPEINFO_QUERY, "", &[Type::OID], &[&oid]))
2016-12-25 17:51:12 +00:00
.and_then(|c| c.read_rows().collect())
.and_then(move |(r, c)| {
let get = |idx| r.get(0).and_then(|r| r.get(idx));
2018-04-22 21:55:20 +00:00
let name = match String::from_sql_nullable(&Type::NAME, get(0)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let type_ = match i8::from_sql_nullable(&Type::CHAR, get(1)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let elem_oid = match Oid::from_sql_nullable(&Type::OID, get(2)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let rngsubtype = match Option::<Oid>::from_sql_nullable(&Type::OID, get(3)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let basetype = match Oid::from_sql_nullable(&Type::OID, get(4)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let schema = match String::from_sql_nullable(&Type::NAME, get(5)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
2018-04-22 21:55:20 +00:00
let relid = match Oid::from_sql_nullable(&Type::OID, get(6)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
2016-12-25 17:51:12 +00:00
};
let kind = if type_ == b'p' as i8 {
Either::A(Ok((Kind::Pseudo, c)).into_future())
} else if type_ == b'e' as i8 {
2017-07-01 03:35:17 +00:00
Either::B(
c.get_enum_variants(oid)
.map(|(v, c)| (Kind::Enum(v), c))
2017-09-30 21:56:15 +00:00
.boxed2(),
2017-07-01 03:35:17 +00:00
)
2016-12-25 17:51:12 +00:00
} else if basetype != 0 {
2017-07-01 03:35:17 +00:00
Either::B(
c.get_type(basetype)
.map(|(t, c)| (Kind::Domain(t), c))
2017-09-30 21:56:15 +00:00
.boxed2(),
2017-07-01 03:35:17 +00:00
)
2016-12-25 17:51:12 +00:00
} else if elem_oid != 0 {
2017-07-01 03:35:17 +00:00
Either::B(
c.get_type(elem_oid)
.map(|(t, c)| (Kind::Array(t), c))
2017-09-30 21:56:15 +00:00
.boxed2(),
2017-07-01 03:35:17 +00:00
)
2016-12-25 17:51:12 +00:00
} else if relid != 0 {
2017-07-01 03:35:17 +00:00
Either::B(
c.get_composite_fields(relid)
.map(|(f, c)| (Kind::Composite(f), c))
2017-09-30 21:56:15 +00:00
.boxed2(),
2017-07-01 03:35:17 +00:00
)
2016-12-25 17:51:12 +00:00
} else if let Some(rngsubtype) = rngsubtype {
2017-07-01 03:35:17 +00:00
Either::B(
c.get_type(rngsubtype)
.map(|(t, c)| (Kind::Range(t), c))
2017-09-30 21:56:15 +00:00
.boxed2(),
2017-07-01 03:35:17 +00:00
)
2016-12-25 17:51:12 +00:00
} else {
Either::A(Ok((Kind::Simple, c)).into_future())
};
2018-04-15 21:38:01 +00:00
Either::B(kind.map(move |(k, c)| (Type::_new(name, oid, k, schema), c)))
2016-12-25 17:51:12 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 22:14:02 +00:00
fn setup_typeinfo_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
if self.0.has_typeinfo_query {
2017-09-30 21:56:15 +00:00
return Ok(self).into_future().boxed2();
2016-12-25 17:51:12 +00:00
}
2017-07-01 03:35:17 +00:00
self.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \
2018-01-10 05:15:35 +00:00
t.typbasetype, n.nspname, t.typrelid \
FROM pg_catalog.pg_type t \
LEFT OUTER JOIN pg_catalog.pg_range r ON \
r.rngtypid = t.oid \
INNER JOIN pg_catalog.pg_namespace n ON \
t.typnamespace = n.oid \
WHERE t.oid = $1",
2017-07-20 04:22:27 +00:00
).or_else(|(e, c)| {
// Range types weren't added until Postgres 9.2, so pg_range may not exist
2018-04-22 21:55:20 +00:00
if e.code() == Some(&SqlState::UNDEFINED_TABLE) {
2017-07-20 04:22:27 +00:00
Either::A(c.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, \
2018-01-10 05:15:35 +00:00
NULL::OID, t.typbasetype, n.nspname, \
t.typrelid \
FROM pg_catalog.pg_type t \
INNER JOIN pg_catalog.pg_namespace n \
ON t.typnamespace = n.oid \
WHERE t.oid = $1",
2017-07-20 04:22:27 +00:00
))
} else {
Either::B(Err((e, c)).into_future())
2016-12-25 17:51:12 +00:00
}
})
.map(|(_, _, mut c)| {
c.0.has_typeinfo_query = true;
c
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-07-20 04:22:27 +00:00
fn get_enum_variants(
self,
oid: Oid,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (Vec<String>, Connection), Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
self.setup_typeinfo_enum_query()
2018-04-22 21:55:20 +00:00
.and_then(move |c| c.raw_execute(TYPEINFO_ENUM_QUERY, "", &[Type::OID], &[&oid]))
2016-12-25 17:51:12 +00:00
.and_then(|c| c.read_rows().collect())
.and_then(|(r, c)| {
let mut variants = vec![];
for row in r {
2018-04-22 21:55:20 +00:00
let variant = match String::from_sql_nullable(&Type::NAME, row.get(0)) {
2016-12-25 17:51:12 +00:00
Ok(v) => v,
2017-07-20 04:22:27 +00:00
Err(e) => return Err((error::conversion(e), c)),
2016-12-25 17:51:12 +00:00
};
variants.push(variant);
}
Ok((variants, c))
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 22:14:02 +00:00
fn setup_typeinfo_enum_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
if self.0.has_typeinfo_enum_query {
2017-09-30 21:56:15 +00:00
return Ok(self).into_future().boxed2();
2016-12-25 17:51:12 +00:00
}
2017-07-01 03:35:17 +00:00
self.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel \
2018-01-10 05:15:35 +00:00
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY enumsortorder",
).or_else(|(e, c)| {
2018-04-22 21:55:20 +00:00
if e.code() == Some(&SqlState::UNDEFINED_COLUMN) {
2018-01-10 05:15:35 +00:00
Either::A(c.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel FROM pg_catalog.pg_enum WHERE \
enumtypid = $1 ORDER BY oid",
))
} else {
Either::B(Err((e, c)).into_future())
}
2016-12-25 17:51:12 +00:00
})
.map(|(_, _, mut c)| {
c.0.has_typeinfo_enum_query = true;
c
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 21:56:15 +00:00
fn get_composite_fields(
self,
oid: Oid,
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (Vec<Field>, Connection), Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
self.setup_typeinfo_composite_query()
2018-04-22 21:55:20 +00:00
.and_then(move |c| c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::OID], &[&oid]))
2016-12-25 17:51:12 +00:00
.and_then(|c| c.read_rows().collect())
.and_then(|(r, c)| {
2017-09-30 22:14:02 +00:00
futures::stream::iter_ok(r).fold((vec![], c), |(mut fields, c), row| {
2018-04-22 21:55:20 +00:00
let name = match String::from_sql_nullable(&Type::NAME, row.get(0)) {
2017-09-30 22:14:02 +00:00
Ok(name) => name,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
};
2018-04-22 21:55:20 +00:00
let oid = match Oid::from_sql_nullable(&Type::OID, row.get(1)) {
2017-09-30 22:14:02 +00:00
Ok(oid) => oid,
Err(e) => return Either::A(Err((error::conversion(e), c)).into_future()),
};
Either::B(c.get_type(oid).map(move |(ty, c)| {
fields.push(Field::new(name, ty));
(fields, c)
}))
})
2016-12-25 17:51:12 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 22:14:02 +00:00
fn setup_typeinfo_composite_query(
self,
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2016-12-25 17:51:12 +00:00
if self.0.has_typeinfo_composite_query {
2017-09-30 21:56:15 +00:00
return Ok(self).into_future().boxed2();
2016-12-25 17:51:12 +00:00
}
2017-07-01 03:35:17 +00:00
self.raw_prepare(
TYPEINFO_COMPOSITE_QUERY,
"SELECT attname, atttypid \
2018-01-10 05:15:35 +00:00
FROM pg_catalog.pg_attribute \
WHERE attrelid = $1 \
AND NOT attisdropped \
AND attnum > 0 \
ORDER BY attnum",
2017-07-01 03:35:17 +00:00
).map(|(_, _, mut c)| {
2016-12-25 17:51:12 +00:00
c.0.has_typeinfo_composite_query = true;
c
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 19:16:43 +00:00
}
2017-07-01 03:35:17 +00:00
fn raw_execute(
self,
stmt: &str,
portal: &str,
param_types: &[Type],
params: &[&ToSql],
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = Connection, Error = (Error, Connection)> + Send> {
2017-07-01 03:35:17 +00:00
assert!(
param_types.len() == params.len(),
"expected {} parameters but got {}",
param_types.len(),
params.len()
);
2016-12-21 20:16:47 +00:00
let mut bind = vec![];
let mut execute = vec![];
let mut sync = vec![];
2016-12-22 02:51:47 +00:00
frontend::sync(&mut sync);
2017-07-01 03:35:17 +00:00
let r = frontend::bind(
portal,
stmt,
Some(1),
params.iter().zip(param_types),
|(param, ty), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Err(e) => Err(e),
},
Some(1),
&mut bind,
);
2016-12-21 20:16:47 +00:00
let r = match r {
Ok(()) => Ok(self),
2017-07-20 04:22:27 +00:00
Err(frontend::BindError::Conversion(e)) => Err((error::conversion(e), self)),
Err(frontend::BindError::Serialization(e)) => Err((error::io(e), self)),
2016-12-21 20:16:47 +00:00
};
2017-09-30 21:56:15 +00:00
r.and_then(|s| match frontend::execute(portal, 0, &mut execute) {
Ok(()) => Ok(s),
Err(e) => Err((error::io(e), s)),
2017-07-01 03:35:17 +00:00
}).into_future()
2016-12-22 01:34:19 +00:00
.and_then(|s| {
2018-01-10 05:15:35 +00:00
let it = Some(bind)
.into_iter()
.chain(Some(execute))
.chain(Some(sync));
2017-09-30 22:14:02 +00:00
s.0
.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
2017-09-30 21:56:15 +00:00
})
2018-04-15 21:38:01 +00:00
.and_then(|s| s.0.read().map_err(|(e, s)| (error::io(e), Connection(s))))
2017-02-24 04:27:29 +00:00
.and_then(|(m, s)| match m {
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
2017-07-20 04:22:27 +00:00
_ => Either::A(Err((bad_message(), Connection(s))).into_future()),
2016-12-21 20:16:47 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 20:16:47 +00:00
}
2017-09-30 22:14:02 +00:00
fn finish_execute(
self,
) -> Box<Future<Item = (u64, Connection), Error = (Error, Connection)> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2017-07-01 03:35:17 +00:00
.and_then(|(m, s)| match m {
2017-09-30 21:56:15 +00:00
backend::Message::DataRow(_) => Connection(s).finish_execute().boxed2(),
2017-07-01 03:35:17 +00:00
backend::Message::CommandComplete(body) => {
2018-04-15 21:38:01 +00:00
let r = body.tag()
.map(|tag| tag.split_whitespace().last().unwrap().parse().unwrap_or(0));
2017-07-20 04:22:27 +00:00
match r {
2017-09-30 21:56:15 +00:00
Ok(n) => Connection(s).ready(n).boxed2(),
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
2017-07-20 04:22:27 +00:00
}
2016-12-21 20:16:47 +00:00
}
2017-09-30 21:56:15 +00:00
backend::Message::EmptyQueryResponse => Connection(s).ready(0).boxed2(),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body).boxed2(),
_ => Err((bad_message(), Connection(s))).into_future().boxed2(),
2016-12-21 20:16:47 +00:00
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 20:16:47 +00:00
}
2017-09-30 21:56:15 +00:00
fn read_rows(
self,
) -> Box<StateStream<Item = RowData, State = Connection, Error = Error> + Send> {
2016-12-25 17:51:12 +00:00
futures_state_stream::unfold(self, |c| {
2017-07-01 03:35:17 +00:00
c.read_row().and_then(|(r, c)| match r {
Some(data) => {
let event = StreamEvent::Next((data, c));
Either::A(Ok(event).into_future())
}
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
2017-02-24 04:27:29 +00:00
})
2017-09-30 21:56:15 +00:00
}).boxed2()
2016-12-25 17:51:12 +00:00
}
2017-09-30 22:14:02 +00:00
fn read_row(
self,
) -> Box<Future<Item = (Option<RowData>, Connection), Error = (Error, Connection)> + Send> {
2017-02-24 04:27:29 +00:00
self.0
.read()
2017-07-20 04:22:27 +00:00
.map_err(|(e, s)| (error::io(e), Connection(s)))
2016-12-23 03:10:45 +00:00
.and_then(|(m, s)| {
let c = Connection(s);
match m {
backend::Message::DataRow(body) => {
2017-07-20 04:22:27 +00:00
let r = match RowData::new(body) {
Ok(r) => Ok((Some(r), c)),
2017-09-30 21:56:15 +00:00
Err(e) => Err((error::io(e), c)),
2017-07-20 04:22:27 +00:00
};
Either::A(r.into_future())
2016-12-23 03:10:45 +00:00
}
2018-01-10 05:15:35 +00:00
backend::Message::EmptyQueryResponse | backend::Message::CommandComplete(_) => {
Either::A(Ok((None, c)).into_future())
}
2017-02-24 04:27:29 +00:00
backend::Message::ErrorResponse(body) => Either::B(c.ready_err(body)),
2017-07-20 04:22:27 +00:00
_ => Either::A(Err((bad_message(), c)).into_future()),
2016-12-23 03:10:45 +00:00
}
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-23 03:10:45 +00:00
}
2016-12-26 21:21:20 +00:00
/// Creates a new prepared statement.
2017-09-30 22:14:02 +00:00
pub fn prepare(
self,
query: &str,
) -> Box<Future<Item = (Statement, Connection), Error = (Error, Connection)> + Send> {
let id = NEXT_STMT_ID.fetch_add(1, Ordering::SeqCst);
2016-12-21 20:45:54 +00:00
let name = format!("s{}", id);
self.raw_prepare(&name, query)
.map(|(params, columns, conn)| {
2017-02-24 04:27:29 +00:00
let stmt =
Statement::new(conn.0.close_sender.clone(), name, params, Arc::new(columns));
2016-12-21 20:45:54 +00:00
(stmt, conn)
})
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-21 20:45:54 +00:00
}
2016-12-26 21:21:20 +00:00
/// Executes a statement, returning the number of rows modified.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number
/// expected.
2017-07-01 03:35:17 +00:00
pub fn execute(
self,
statement: &Statement,
params: &[&ToSql],
2017-09-30 22:14:02 +00:00
) -> Box<Future<Item = (u64, Connection), Error = (Error, Connection)> + Send> {
2016-12-26 20:57:43 +00:00
self.raw_execute(statement.name(), "", statement.parameters(), params)
2016-12-22 23:47:05 +00:00
.and_then(|conn| conn.finish_execute())
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-22 23:47:05 +00:00
}
2016-12-26 21:21:20 +00:00
/// Executes a statement, returning a stream over the resulting rows.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number
/// expected.
2017-07-01 03:35:17 +00:00
pub fn query(
self,
statement: &Statement,
params: &[&ToSql],
2017-09-30 21:56:15 +00:00
) -> Box<StateStream<Item = Row, State = Connection, Error = Error> + Send> {
2016-12-26 20:57:43 +00:00
let columns = statement.columns_arc().clone();
self.raw_execute(statement.name(), "", statement.parameters(), params)
2016-12-26 20:50:36 +00:00
.map(|c| c.read_rows().map(move |r| Row::new(columns.clone(), r)))
2016-12-23 03:10:45 +00:00
.flatten_state_stream()
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-23 03:10:45 +00:00
}
2016-12-26 21:21:20 +00:00
/// Starts a new transaction.
2017-09-30 22:14:02 +00:00
pub fn transaction(
self,
) -> Box<Future<Item = Transaction, Error = (Error, Connection)> + Send> {
2016-12-23 19:09:05 +00:00
self.simple_query("BEGIN")
2016-12-26 20:43:08 +00:00
.map(|(_, c)| Transaction::new(c))
2017-09-30 21:56:15 +00:00
.boxed2()
2016-12-23 19:09:05 +00:00
}
/// Returns a stream of asynchronus notifications receieved from the server.
pub fn notifications(self) -> Notifications {
Notifications(self)
}
2016-12-26 21:21:20 +00:00
/// Returns information used to cancel pending queries.
///
/// Used with the `cancel_query` function. The object returned can be used
/// to cancel any query executed by the connection it was created from.
2016-12-20 23:42:28 +00:00
pub fn cancel_data(&self) -> CancelData {
2016-12-21 00:22:35 +00:00
self.0.cancel_data
2016-12-20 23:42:28 +00:00
}
2016-12-26 21:21:20 +00:00
/// Returns the value of the specified Postgres backend parameter, such as
/// `timezone` or `server_version`.
pub fn parameter(&self, param: &str) -> Option<&str> {
self.0.parameters.get(param).map(|s| &**s)
}
/// Returns whether or not the stream has been desynchronized due to an
/// error in the communication channel with the server.
///
/// If this has occurred, all further queries will immediately return an
/// error.
pub fn is_desynchronized(&self) -> bool {
self.0.desynchronized
}
2016-12-20 23:42:28 +00:00
}
/// A stream of asynchronous Postgres notifications.
2018-05-02 16:28:09 +00:00
#[derive(Debug)]
pub struct Notifications(Connection);
impl Notifications {
/// Consumes the `Notifications`, returning the inner `Connection`.
pub fn into_inner(self) -> Connection {
self.0
}
}
impl Stream for Notifications {
type Item = Notification;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Notification>, Error> {
if let Some(notification) = (self.0).0.notifications.pop_front() {
return Ok(Async::Ready(Some(notification)));
}
match try_ready!((self.0).0.poll()) {
Some(backend::Message::NotificationResponse(body)) => {
let notification = Notification {
process_id: body.process_id(),
channel: body.channel()?.to_owned(),
payload: body.message()?.to_owned(),
};
Ok(Async::Ready(Some(notification)))
}
Some(_) => Err(bad_message()),
None => Ok(Async::Ready(None)),
}
}
}
2017-07-20 04:22:27 +00:00
fn err(fields: &mut ErrorFields) -> Error {
2016-12-21 03:50:44 +00:00
match DbError::new(fields) {
2017-07-20 04:22:27 +00:00
Ok(err) => error::db(err),
Err(err) => error::io(err),
2016-12-21 03:50:44 +00:00
}
}
2016-12-20 23:42:28 +00:00
fn bad_message<T>() -> T
2017-07-01 03:35:17 +00:00
where
T: From<io::Error>,
2016-12-20 23:42:28 +00:00
{
io::Error::new(io::ErrorKind::InvalidInput, "unexpected message").into()
}
2017-09-30 21:56:15 +00:00
trait BoxedFuture<I, E> {
fn boxed2(self) -> Box<Future<Item = I, Error = E> + Send>;
}
impl<F, I, E> BoxedFuture<I, E> for F
where
F: Future<Item = I, Error = E> + Send + 'static,
{
fn boxed2(self) -> Box<Future<Item = I, Error = E> + Send> {
Box::new(self)
}
}
trait BoxedStateStream<I, S, E> {
fn boxed2(self) -> Box<StateStream<Item = I, State = S, Error = E> + Send>;
}
impl<T, I, S, E> BoxedStateStream<I, S, E> for T
where
2018-01-10 05:15:35 +00:00
T: StateStream<Item = I, State = S, Error = E> + Send + 'static,
2017-09-30 21:56:15 +00:00
{
fn boxed2(self) -> Box<StateStream<Item = I, State = S, Error = E> + Send> {
Box::new(self)
}
}