rust-postgres/src/lib.rs

1448 lines
48 KiB
Rust
Raw Normal View History

2014-07-03 05:23:17 +00:00
//! Rust-Postgres is a pure-Rust frontend for the popular PostgreSQL database. It
//! exposes a high level interface in the vein of JDBC or Go's `database/sql`
//! package.
//!
//! ```rust,no_run
//! extern crate postgres;
//!
2014-11-17 21:46:33 +00:00
//! use postgres::{Connection, SslMode};
2014-07-03 05:23:17 +00:00
//!
//! struct Person {
//! id: i32,
//! name: String,
//! data: Option<Vec<u8>>
//! }
//!
//! fn main() {
2014-11-17 21:46:33 +00:00
//! let conn = Connection::connect("postgresql://postgres@localhost", &SslMode::None)
2014-11-01 23:38:52 +00:00
//! .unwrap();
2014-07-03 05:23:17 +00:00
//!
//! conn.execute("CREATE TABLE person (
//! id SERIAL PRIMARY KEY,
//! name VARCHAR NOT NULL,
//! data BYTEA
2014-11-25 20:59:31 +00:00
//! )", &[]).unwrap();
2014-07-03 05:23:17 +00:00
//! let me = Person {
//! id: 0,
2014-12-23 17:10:16 +00:00
//! name: "Steven".to_string(),
2014-07-03 05:23:17 +00:00
//! data: None
//! };
2015-02-08 03:52:45 +00:00
//! conn.execute("INSERT INTO person (name, data) VALUES ($1, $2)",
//! &[&me.name, &me.data]).unwrap();
2014-07-03 05:23:17 +00:00
//!
2015-02-08 03:52:45 +00:00
//! let stmt = conn.prepare("SELECT id, name, data FROM person").unwrap();
2015-02-20 08:04:14 +00:00
//! for row in stmt.query(&[]).unwrap() {
2014-07-03 05:23:17 +00:00
//! let person = Person {
//! id: row.get(0),
//! name: row.get(1),
2015-02-08 03:52:45 +00:00
//! data: row.get(2)
2014-07-03 05:23:17 +00:00
//! };
//! println!("Found person {}", person.name);
//! }
//! }
//! ```
2015-08-29 00:01:46 +00:00
#![doc(html_root_url="https://sfackler.github.io/rust-postgres/doc/v0.9.6")]
2014-10-31 15:51:27 +00:00
#![warn(missing_docs)]
2013-10-08 04:11:54 +00:00
2015-05-05 18:40:14 +00:00
extern crate bufstream;
2015-02-05 04:39:30 +00:00
extern crate byteorder;
2015-01-07 16:23:40 +00:00
#[macro_use]
2014-12-10 05:35:52 +00:00
extern crate log;
extern crate phf;
2015-03-26 02:11:40 +00:00
extern crate rustc_serialize as serialize;
2015-02-26 17:02:32 +00:00
#[cfg(feature = "unix_socket")]
extern crate unix_socket;
2015-03-31 04:03:31 +00:00
extern crate debug_builders;
2015-02-26 17:02:32 +00:00
2015-05-05 18:40:14 +00:00
use bufstream::BufStream;
use md5::Md5;
2015-03-31 04:03:31 +00:00
use debug_builders::DebugStruct;
use std::ascii::AsciiExt;
2015-05-30 05:54:10 +00:00
use std::borrow::ToOwned;
2014-02-08 02:17:40 +00:00
use std::cell::{Cell, RefCell};
2015-02-20 21:56:03 +00:00
use std::collections::{VecDeque, HashMap};
2015-06-16 05:20:09 +00:00
use std::error::Error as StdError;
2014-12-10 05:35:52 +00:00
use std::fmt;
use std::iter::IntoIterator;
2015-04-28 05:26:20 +00:00
use std::io as std_io;
2015-02-26 17:02:32 +00:00
use std::io::prelude::*;
2015-06-16 05:20:09 +00:00
use std::marker::Sync as StdSync;
2014-02-12 07:42:46 +00:00
use std::mem;
2014-11-01 23:12:05 +00:00
use std::result;
2015-02-26 17:02:32 +00:00
#[cfg(feature = "unix_socket")]
use std::path::PathBuf;
2013-08-04 02:17:32 +00:00
2015-08-16 04:39:46 +00:00
pub use stmt::{Statement, Column};
2015-05-26 05:47:25 +00:00
use error::{Error, ConnectError, SqlState, DbError};
2015-05-30 05:54:10 +00:00
use types::{ToSql, FromSql};
use io::{StreamWrapper, NegotiateSsl};
2015-08-16 04:39:46 +00:00
use types::{IsNull, Kind, Type, SessionInfo, Oid, Other};
2014-11-04 05:47:53 +00:00
use message::BackendMessage::*;
2014-12-10 05:35:52 +00:00
use message::FrontendMessage::*;
use message::{FrontendMessage, BackendMessage, RowDescriptionEntry};
use message::{WriteMessage, ReadMessage};
2015-02-26 17:02:32 +00:00
use url::Url;
2015-05-30 05:54:10 +00:00
use rows::{Rows, LazyRows};
2013-07-25 07:10:18 +00:00
2015-01-07 16:23:40 +00:00
#[macro_use]
2014-07-07 02:02:22 +00:00
mod macros;
2015-08-16 04:39:46 +00:00
mod md5;
mod message;
2015-04-28 05:26:20 +00:00
mod priv_io;
mod url;
2014-09-30 05:56:43 +00:00
mod util;
2015-08-16 04:39:46 +00:00
pub mod error;
pub mod io;
2015-05-30 05:54:10 +00:00
pub mod rows;
2015-08-16 06:21:39 +00:00
pub mod stmt;
2015-08-16 04:39:46 +00:00
pub mod types;
const TYPEINFO_QUERY: &'static str = "t";
2013-10-21 00:32:14 +00:00
/// A type alias of the result returned by many methods.
2014-11-04 06:24:11 +00:00
pub type Result<T> = result::Result<T, Error>;
2014-04-03 04:26:41 +00:00
/// Specifies the target server to connect to.
2015-01-24 07:31:17 +00:00
#[derive(Clone, Debug)]
2014-11-01 23:13:01 +00:00
pub enum ConnectTarget {
/// Connect via TCP to the specified host.
Tcp(String),
/// Connect via a Unix domain socket in the specified directory.
2015-02-26 17:02:32 +00:00
///
/// Only available on Unix platforms with the `unix_socket` feature.
#[cfg(feature = "unix_socket")]
Unix(PathBuf)
}
2015-04-08 06:57:22 +00:00
/// Authentication information.
2015-01-24 07:31:17 +00:00
#[derive(Clone, Debug)]
2014-11-01 23:14:08 +00:00
pub struct UserInfo {
/// The username
pub user: String,
/// An optional password
pub password: Option<String>,
}
/// Information necessary to open a new connection to a Postgres server.
2015-01-24 07:31:17 +00:00
#[derive(Clone, Debug)]
2014-11-01 23:15:30 +00:00
pub struct ConnectParams {
/// The target server
2014-11-01 23:13:01 +00:00
pub target: ConnectTarget,
/// The target port.
///
/// Defaults to 5432 if not specified.
2015-02-26 17:02:32 +00:00
pub port: Option<u16>,
/// The user to login as.
///
2014-11-02 18:38:45 +00:00
/// `Connection::connect` requires a user but `cancel_query` does not.
2014-11-01 23:14:08 +00:00
pub user: Option<UserInfo>,
/// The database to connect to. Defaults the value of `user`.
2014-05-26 03:38:40 +00:00
pub database: Option<String>,
/// Runtime parameters to be passed to the Postgres backend.
2014-05-26 03:38:40 +00:00
pub options: Vec<(String, String)>,
}
2014-11-02 18:38:45 +00:00
/// A trait implemented by types that can be converted into a `ConnectParams`.
pub trait IntoConnectParams {
2014-11-01 23:15:30 +00:00
/// Converts the value of `self` into a `ConnectParams`.
2015-06-16 05:20:09 +00:00
fn into_connect_params(self) -> result::Result<ConnectParams, Box<StdError+StdSync+Send>>;
}
2014-11-01 23:15:30 +00:00
impl IntoConnectParams for ConnectParams {
2015-06-16 05:20:09 +00:00
fn into_connect_params(self) -> result::Result<ConnectParams, Box<StdError+StdSync+Send>> {
Ok(self)
}
}
impl<'a> IntoConnectParams for &'a str {
2015-06-16 05:20:09 +00:00
fn into_connect_params(self) -> result::Result<ConnectParams, Box<StdError+StdSync+Send>> {
2014-07-07 13:41:43 +00:00
match Url::parse(self) {
2014-05-17 03:47:03 +00:00
Ok(url) => url.into_connect_params(),
2015-06-16 05:20:09 +00:00
Err(err) => return Err(err.into()),
2014-05-17 03:47:03 +00:00
}
}
}
impl IntoConnectParams for Url {
2015-06-16 05:20:09 +00:00
fn into_connect_params(self) -> result::Result<ConnectParams, Box<StdError+StdSync+Send>> {
let Url {
host,
port,
user,
2015-03-29 22:23:38 +00:00
path: url::Path { mut path, query: options, .. },
..
2014-05-17 03:47:03 +00:00
} = self;
2015-02-26 17:02:32 +00:00
#[cfg(feature = "unix_socket")]
2015-06-16 05:20:09 +00:00
fn make_unix(maybe_path: String)
-> result::Result<ConnectTarget, Box<StdError+StdSync+Send>> {
2015-05-30 22:32:10 +00:00
Ok(ConnectTarget::Unix(PathBuf::from(maybe_path)))
2015-02-26 17:02:32 +00:00
}
#[cfg(not(feature = "unix_socket"))]
2015-06-16 05:20:09 +00:00
fn make_unix(_: String) -> result::Result<ConnectTarget, Box<StdError+StdSync+Send>> {
Err("unix socket support requires the `unix_socket` feature".into())
2015-02-26 17:02:32 +00:00
}
2015-06-16 05:20:09 +00:00
let maybe_path = try!(url::decode_component(&host));
2014-12-03 05:44:34 +00:00
let target = if maybe_path.starts_with("/") {
2015-02-26 17:02:32 +00:00
try!(make_unix(maybe_path))
} else {
ConnectTarget::Tcp(host)
};
2014-11-02 18:38:45 +00:00
let user = user.map(|url::UserInfo { user, pass }| {
UserInfo { user: user, password: pass }
});
2015-03-29 22:23:38 +00:00
let database = if path.is_empty() {
None
} else {
// path contains the leading /
path.remove(0);
Some(path)
};
2014-11-01 23:15:30 +00:00
Ok(ConnectParams {
target: target,
port: port,
user: user,
database: database,
options: options,
})
}
}
2013-09-30 02:12:20 +00:00
/// Trait for types that can handle Postgres notice messages
2015-02-22 23:05:19 +00:00
pub trait HandleNotice: Send {
2013-09-30 02:12:20 +00:00
/// Handle a Postgres notice message
2015-02-22 23:05:19 +00:00
fn handle_notice(&mut self, notice: DbError);
}
2013-09-30 05:22:10 +00:00
/// A notice handler which logs at the `info` level.
2013-09-30 02:12:20 +00:00
///
2014-11-01 23:21:47 +00:00
/// This is the default handler used by a `Connection`.
2015-04-03 15:46:02 +00:00
#[derive(Copy, Clone, Debug)]
2015-02-22 23:05:19 +00:00
pub struct LoggingNoticeHandler;
2015-02-22 23:05:19 +00:00
impl HandleNotice for LoggingNoticeHandler {
fn handle_notice(&mut self, notice: DbError) {
2015-02-06 06:24:57 +00:00
info!("{}: {}", notice.severity(), notice.message());
}
}
2015-05-02 22:55:53 +00:00
/// An asynchronous notification.
2015-01-24 07:31:17 +00:00
#[derive(Clone, Debug)]
2014-11-01 23:18:09 +00:00
pub struct Notification {
2015-05-02 22:55:53 +00:00
/// The process ID of the notifying backend process.
2014-08-16 03:14:46 +00:00
pub pid: u32,
2015-05-02 22:55:53 +00:00
/// The name of the channel that the notify has been raised on.
2014-05-26 03:38:40 +00:00
pub channel: String,
2015-05-02 22:55:53 +00:00
/// The "payload" string passed from the notifying process.
2014-05-26 03:38:40 +00:00
pub payload: String,
}
2015-05-02 22:55:53 +00:00
/// An iterator over asynchronous notifications.
2014-11-01 23:18:09 +00:00
pub struct Notifications<'conn> {
2014-11-01 23:21:47 +00:00
conn: &'conn Connection
}
2015-01-23 18:44:15 +00:00
impl<'a> fmt::Debug for Notifications<'a> {
2015-01-10 04:48:47 +00:00
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
2015-03-31 04:03:31 +00:00
DebugStruct::new(fmt, "Notifications")
2015-03-14 18:11:44 +00:00
.field("pending", &self.conn.conn.borrow().notifications.len())
.finish()
2015-01-10 04:48:47 +00:00
}
}
2015-01-04 16:08:53 +00:00
impl<'conn> Iterator for Notifications<'conn> {
type Item = Notification;
/// Returns the oldest pending notification or `None` if there are none.
///
2014-07-30 02:48:56 +00:00
/// ## Note
///
/// `next` may return `Some` notification after returning `None` if a new
/// notification was received.
2014-11-01 23:18:09 +00:00
fn next(&mut self) -> Option<Notification> {
2014-03-23 04:20:22 +00:00
self.conn.conn.borrow_mut().notifications.pop_front()
}
}
impl<'conn> Notifications<'conn> {
/// Returns the oldest pending notification.
///
/// If no notifications are pending, blocks until one arrives.
pub fn next_block(&mut self) -> Result<Notification> {
if let Some(notification) = self.next() {
return Ok(notification);
}
let mut conn = self.conn.conn.borrow_mut();
check_desync!(conn);
match try!(conn.read_message_with_notification()) {
NotificationResponse { pid, channel, payload } => {
Ok(Notification {
pid: pid,
channel: channel,
payload: payload
})
}
_ => unreachable!()
}
}
}
2015-05-02 22:55:53 +00:00
/// Contains information necessary to cancel queries for a session.
2015-01-24 07:31:17 +00:00
#[derive(Copy, Clone, Debug)]
2014-11-01 23:19:02 +00:00
pub struct CancelData {
2015-05-02 22:55:53 +00:00
/// The process ID of the session.
2014-08-16 03:14:46 +00:00
pub process_id: u32,
2015-05-02 22:55:53 +00:00
/// The secret key for the session.
2014-08-16 03:14:46 +00:00
pub secret_key: u32,
2013-10-21 00:32:14 +00:00
}
/// Attempts to cancel an in-progress query.
///
/// The backend provides no information about whether a cancellation attempt
/// was successful or not. An error will only be returned if the driver was
/// unable to connect to the database.
///
2014-11-03 00:48:38 +00:00
/// A `CancelData` object can be created via `Connection::cancel_data`. The
/// object can cancel any query made on that connection.
2014-03-09 22:22:20 +00:00
///
2014-08-21 05:40:11 +00:00
/// Only the host and port of the connection info are used. See
2014-11-01 23:21:47 +00:00
/// `Connection::connect` for details of the `params` argument.
2014-03-31 02:21:51 +00:00
///
2014-07-30 02:48:56 +00:00
/// ## Example
2014-03-09 22:22:20 +00:00
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-11-17 21:46:33 +00:00
/// # use postgres::{Connection, SslMode};
2015-03-26 02:11:40 +00:00
/// # use std::thread;
2014-03-09 22:22:20 +00:00
/// # let url = "";
2014-11-17 21:46:33 +00:00
/// let conn = Connection::connect(url, &SslMode::None).unwrap();
2014-03-09 22:22:20 +00:00
/// let cancel_data = conn.cancel_data();
2015-03-26 02:11:40 +00:00
/// thread::spawn(move || {
/// conn.execute("SOME EXPENSIVE QUERY", &[]).unwrap();
2015-01-09 19:20:43 +00:00
/// });
2014-03-09 22:22:20 +00:00
/// # let _ =
2014-11-17 21:46:33 +00:00
/// postgres::cancel_query(url, &SslMode::None, cancel_data);
2014-03-09 22:22:20 +00:00
/// ```
2014-11-01 23:19:02 +00:00
pub fn cancel_query<T>(params: T, ssl: &SslMode, data: CancelData)
-> result::Result<(), ConnectError>
where T: IntoConnectParams {
2015-06-16 05:20:09 +00:00
let params = try!(params.into_connect_params().map_err(ConnectError::BadConnectParams));
2015-04-28 05:26:20 +00:00
let mut socket = try!(priv_io::initialize_stream(&params, ssl));
2013-10-21 00:32:14 +00:00
2014-11-17 06:54:57 +00:00
try!(socket.write_message(&CancelRequest {
2013-10-21 00:32:14 +00:00
code: message::CANCEL_CODE,
process_id: data.process_id,
secret_key: data.secret_key
2014-02-07 06:59:33 +00:00
}));
2014-11-17 06:54:57 +00:00
try!(socket.flush());
2013-10-21 00:32:14 +00:00
2013-12-06 07:20:50 +00:00
Ok(())
2013-10-21 00:32:14 +00:00
}
fn bad_response() -> std_io::Error {
std_io::Error::new(std_io::ErrorKind::InvalidInput,
"the server returned an unexpected response")
}
2015-05-26 06:27:12 +00:00
fn desynchronized() -> std_io::Error {
std_io::Error::new(std_io::ErrorKind::Other,
"communication with the server has desynchronized due to an earlier IO error")
}
/// An enumeration of transaction isolation levels.
///
2015-04-23 05:20:21 +00:00
/// See the [Postgres documentation](http://www.postgresql.org/docs/9.4/static/transaction-iso.html)
/// for full details on the semantics of each level.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
/// The "read uncommitted" level.
///
/// In current versions of Postgres, this behaves identically to
/// `ReadCommitted`.
ReadUncommitted,
/// The "read committed" level.
///
/// This is the default isolation level in Postgres.
ReadCommitted,
/// The "repeatable read" level.
RepeatableRead,
/// The "serializable" level.
Serializable,
}
impl IsolationLevel {
fn to_set_query(&self) -> &'static str {
match *self {
IsolationLevel::ReadUncommitted => {
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"
}
IsolationLevel::ReadCommitted => {
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"
}
IsolationLevel::RepeatableRead => {
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ"
}
IsolationLevel::Serializable => {
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE"
}
}
}
fn parse(raw: &str) -> Result<IsolationLevel> {
if raw.eq_ignore_ascii_case("READ UNCOMMITTED") {
Ok(IsolationLevel::ReadUncommitted)
} else if raw.eq_ignore_ascii_case("READ COMMITTED") {
Ok(IsolationLevel::ReadCommitted)
} else if raw.eq_ignore_ascii_case("REPEATABLE READ") {
Ok(IsolationLevel::RepeatableRead)
} else if raw.eq_ignore_ascii_case("SERIALIZABLE") {
Ok(IsolationLevel::Serializable)
} else {
Err(Error::IoError(bad_response()))
}
}
}
2015-04-28 05:26:20 +00:00
/// Specifies the SSL support requested for a new connection.
pub enum SslMode {
2015-04-28 05:26:20 +00:00
/// The connection will not use SSL.
None,
/// The connection will use SSL if the backend supports it.
2015-05-30 19:38:42 +00:00
Prefer(Box<NegotiateSsl+std::marker::Sync+Send>),
2015-04-28 05:26:20 +00:00
/// The connection must use SSL.
2015-05-30 19:38:42 +00:00
Require(Box<NegotiateSsl+std::marker::Sync+Send>),
2015-04-28 05:26:20 +00:00
}
2015-05-30 19:42:01 +00:00
impl fmt::Debug for SslMode {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
SslMode::None => fmt.write_str("None"),
SslMode::Prefer(..) => fmt.write_str("Prefer"),
SslMode::Require(..) => fmt.write_str("Require"),
}
}
}
#[derive(Clone)]
struct CachedStatement {
name: String,
param_types: Vec<Type>,
columns: Vec<Column>,
}
2014-11-01 23:21:47 +00:00
struct InnerConnection {
stream: BufStream<Box<StreamWrapper>>,
2015-02-22 23:05:19 +00:00
notice_handler: Box<HandleNotice>,
2015-02-20 21:56:03 +00:00
notifications: VecDeque<Notification>,
2014-11-01 23:19:02 +00:00
cancel_data: CancelData,
2015-01-22 06:11:43 +00:00
unknown_types: HashMap<Oid, Type>,
cached_statements: HashMap<String, CachedStatement>,
2015-02-16 04:22:56 +00:00
parameters: HashMap<String, String>,
2015-01-29 04:35:17 +00:00
next_stmt_id: u32,
trans_depth: u32,
2015-01-29 04:35:17 +00:00
desynchronized: bool,
finished: bool,
2013-08-29 06:19:53 +00:00
}
2014-11-01 23:21:47 +00:00
impl Drop for InnerConnection {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
2013-08-29 06:19:53 +00:00
}
}
2014-11-01 23:21:47 +00:00
impl InnerConnection {
2014-11-18 02:20:48 +00:00
fn connect<T>(params: T, ssl: &SslMode) -> result::Result<InnerConnection, ConnectError>
2014-08-17 20:57:58 +00:00
where T: IntoConnectParams {
2015-06-16 05:20:09 +00:00
let params = try!(params.into_connect_params().map_err(ConnectError::BadConnectParams));
2015-04-28 05:26:20 +00:00
let stream = try!(priv_io::initialize_stream(&params, ssl));
2014-12-02 05:28:58 +00:00
let ConnectParams { user, database, mut options, .. } = params;
2014-11-17 21:46:33 +00:00
let user = try!(user.ok_or(ConnectError::MissingUser));
2014-11-01 23:21:47 +00:00
let mut conn = InnerConnection {
2015-02-26 17:02:32 +00:00
stream: BufStream::new(stream),
2014-05-28 04:07:58 +00:00
next_stmt_id: 0,
2015-02-22 23:05:19 +00:00
notice_handler: Box::new(LoggingNoticeHandler),
2015-02-20 21:56:03 +00:00
notifications: VecDeque::new(),
2014-11-01 23:19:02 +00:00
cancel_data: CancelData { process_id: 0, secret_key: 0 },
2014-05-28 04:07:58 +00:00
unknown_types: HashMap::new(),
cached_statements: HashMap::new(),
2015-02-16 04:22:56 +00:00
parameters: HashMap::new(),
2014-05-28 04:07:58 +00:00
desynchronized: false,
finished: false,
trans_depth: 0,
};
2014-12-23 17:10:16 +00:00
options.push(("client_encoding".to_owned(), "UTF8".to_owned()));
2013-09-09 04:33:41 +00:00
// Postgres uses the value of TimeZone as the time zone for TIMESTAMP
// WITH TIME ZONE values. Timespec converts to GMT internally.
options.push(("timezone".to_owned(), "GMT".to_owned()));
// We have to clone here since we need the user again for auth
2014-12-23 17:10:16 +00:00
options.push(("user".to_owned(), user.user.clone()));
2014-12-02 05:28:58 +00:00
if let Some(database) = database {
2014-12-23 17:10:16 +00:00
options.push(("database".to_owned(), database));
}
2014-11-19 17:58:30 +00:00
try!(conn.write_messages(&[StartupMessage {
2013-09-09 05:40:08 +00:00
version: message::PROTOCOL_VERSION,
2015-02-03 07:41:03 +00:00
parameters: &options
}]));
try!(conn.handle_auth(user));
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
loop {
2014-11-17 06:54:57 +00:00
match try!(conn.read_message()) {
2013-10-21 00:32:14 +00:00
BackendKeyData { process_id, secret_key } => {
conn.cancel_data.process_id = process_id;
conn.cancel_data.secret_key = secret_key;
}
ReadyForQuery { .. } => break,
ErrorResponse { fields } => return DbError::new_connect(fields),
_ => return Err(ConnectError::IoError(bad_response())),
}
2013-08-05 00:48:48 +00:00
}
2013-08-23 05:24:14 +00:00
try!(conn.setup_typeinfo_query());
Ok(conn)
}
fn setup_typeinfo_query(&mut self) -> result::Result<(), ConnectError> {
match self.raw_prepare(TYPEINFO_QUERY,
"SELECT t.typname, t.typelem, r.rngsubtype \
FROM pg_catalog.pg_type t \
LEFT OUTER JOIN pg_catalog.pg_range r \
ON r.rngtypid = t.oid \
WHERE t.oid = $1") {
Ok(..) => return Ok(()),
Err(Error::IoError(e)) => return Err(ConnectError::IoError(e)),
// Range types weren't added until Postgres 9.2, so pg_range may not exist
2015-02-06 06:24:57 +00:00
Err(Error::DbError(ref e)) if e.code() == &SqlState::UndefinedTable => {}
Err(Error::DbError(e)) => return Err(ConnectError::DbError(e)),
_ => unreachable!()
}
match self.raw_prepare(TYPEINFO_QUERY,
2015-04-08 06:57:22 +00:00
"SELECT typname, typelem, NULL::OID \
FROM pg_catalog.pg_type \
WHERE oid = $1") {
Ok(..) => Ok(()),
Err(Error::IoError(e)) => Err(ConnectError::IoError(e)),
Err(Error::DbError(e)) => Err(ConnectError::DbError(e)),
_ => unreachable!()
}
2013-08-05 00:48:48 +00:00
}
2015-04-28 05:26:20 +00:00
fn write_messages(&mut self, messages: &[FrontendMessage]) -> std_io::Result<()> {
2014-07-26 03:15:24 +00:00
debug_assert!(!self.desynchronized);
2015-02-03 17:06:52 +00:00
for message in messages {
2014-07-10 19:35:57 +00:00
try_desync!(self, self.stream.write_message(message));
2013-07-25 07:10:18 +00:00
}
2014-07-10 19:35:57 +00:00
Ok(try_desync!(self, self.stream.flush()))
2013-08-22 06:41:26 +00:00
}
2013-08-04 02:17:32 +00:00
2015-04-28 05:26:20 +00:00
fn read_message_with_notification(&mut self) -> std_io::Result<BackendMessage> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message()) {
NoticeResponse { fields } => {
if let Ok(err) = DbError::new_raw(fields) {
self.notice_handler.handle_notice(err);
}
}
ParameterStatus { parameter, value } => {
self.parameters.insert(parameter, value);
}
val => return Ok(val)
}
}
}
2015-04-28 05:26:20 +00:00
fn read_message(&mut self) -> std_io::Result<BackendMessage> {
loop {
match try!(self.read_message_with_notification()) {
2014-08-16 02:50:11 +00:00
NotificationResponse { pid, channel, payload } => {
2014-11-07 16:54:10 +00:00
self.notifications.push_back(Notification {
pid: pid,
channel: channel,
payload: payload
2014-08-16 02:50:11 +00:00
})
}
val => return Ok(val)
}
2013-08-04 05:21:16 +00:00
}
2013-08-22 06:41:26 +00:00
}
2014-11-04 06:24:11 +00:00
fn handle_auth(&mut self, user: UserInfo) -> result::Result<(), ConnectError> {
2014-11-17 06:54:57 +00:00
match try!(self.read_message()) {
AuthenticationOk => return Ok(()),
AuthenticationCleartextPassword => {
2014-11-17 21:46:33 +00:00
let pass = try!(user.password.ok_or(ConnectError::MissingPassword));
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[PasswordMessage {
2015-02-03 07:41:03 +00:00
password: &pass,
}]));
}
AuthenticationMD5Password { salt } => {
2014-11-17 21:46:33 +00:00
let pass = try!(user.password.ok_or(ConnectError::MissingPassword));
2015-04-28 05:52:06 +00:00
let mut hasher = Md5::new();
let _ = hasher.input(pass.as_bytes());
let _ = hasher.input(user.user.as_bytes());
let output = hasher.result_str();
hasher.reset();
let _ = hasher.input(output.as_bytes());
let _ = hasher.input(&salt);
let output = format!("md5{}", hasher.result_str());
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[PasswordMessage {
2015-02-03 07:41:03 +00:00
password: &output
}]));
}
AuthenticationKerberosV5
2013-09-12 05:02:32 +00:00
| AuthenticationSCMCredential
| AuthenticationGSS
2014-11-17 21:46:33 +00:00
| AuthenticationSSPI => return Err(ConnectError::UnsupportedAuthentication),
ErrorResponse { fields } => return DbError::new_connect(fields),
_ => return Err(ConnectError::IoError(bad_response()))
}
2014-11-17 06:54:57 +00:00
match try!(self.read_message()) {
AuthenticationOk => Ok(()),
ErrorResponse { fields } => return DbError::new_connect(fields),
_ => return Err(ConnectError::IoError(bad_response()))
}
}
2015-02-22 23:05:19 +00:00
fn set_notice_handler(&mut self, handler: Box<HandleNotice>) -> Box<HandleNotice> {
2014-02-11 03:20:53 +00:00
mem::replace(&mut self.notice_handler, handler)
}
2015-05-18 04:04:44 +00:00
fn raw_prepare(&mut self, stmt_name: &str, query: &str) -> Result<(Vec<Type>, Vec<Column>)> {
2015-03-31 03:18:47 +00:00
debug!("preparing query with name `{}`: {}", stmt_name, query);
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[
2013-11-10 04:58:38 +00:00
Parse {
2014-12-03 06:56:56 +00:00
name: stmt_name,
2013-09-02 17:27:09 +00:00
query: query,
2014-11-19 17:58:30 +00:00
param_types: &[]
2013-09-02 17:27:09 +00:00
},
2013-11-10 04:58:38 +00:00
Describe {
2014-09-06 23:33:43 +00:00
variant: b'S',
2014-12-03 06:56:56 +00:00
name: stmt_name,
2013-09-02 17:27:09 +00:00
},
Sync]));
2013-08-22 06:41:26 +00:00
2014-11-17 06:54:57 +00:00
match try!(self.read_message()) {
2013-10-25 05:30:34 +00:00
ParseComplete => {}
ErrorResponse { fields } => {
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
return DbError::new(fields);
}
2014-07-10 19:35:57 +00:00
_ => bad_response!(self),
}
2013-08-22 06:41:26 +00:00
2015-01-22 06:11:43 +00:00
let raw_param_types = match try!(self.read_message()) {
ParameterDescription { types } => types,
2014-07-10 19:35:57 +00:00
_ => bad_response!(self),
};
2013-08-22 06:41:26 +00:00
let raw_columns = match try!(self.read_message()) {
2015-01-22 06:11:43 +00:00
RowDescription { descriptions } => descriptions,
2014-05-26 18:41:18 +00:00
NoData => vec![],
2014-07-10 19:35:57 +00:00
_ => bad_response!(self)
};
2013-08-17 22:09:26 +00:00
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
2013-08-22 06:41:26 +00:00
2015-01-22 06:11:43 +00:00
let mut param_types = vec![];
2015-02-03 17:06:52 +00:00
for oid in raw_param_types {
2015-01-22 06:11:43 +00:00
param_types.push(try!(self.get_type(oid)));
}
let mut columns = vec![];
for RowDescriptionEntry { name, type_oid, .. } in raw_columns {
2015-08-16 04:39:46 +00:00
columns.push(Column::new(name, try!(self.get_type(type_oid))));
}
2013-12-04 08:18:28 +00:00
Ok((param_types, columns))
2014-12-02 05:28:58 +00:00
}
fn make_stmt_name(&mut self) -> String {
let stmt_name = format!("s{}", self.next_stmt_id);
self.next_stmt_id += 1;
stmt_name
2014-09-30 05:56:43 +00:00
}
2014-11-03 00:48:38 +00:00
fn prepare<'a>(&mut self, query: &str, conn: &'a Connection) -> Result<Statement<'a>> {
2014-12-02 05:28:58 +00:00
let stmt_name = self.make_stmt_name();
let (param_types, columns) = try!(self.raw_prepare(&stmt_name, query));
2015-08-16 04:39:46 +00:00
Ok(Statement::new(conn, stmt_name, param_types, columns, Cell::new(0), false))
2013-08-22 06:41:26 +00:00
}
fn prepare_cached<'a>(&mut self, query: &str, conn: &'a Connection) -> Result<Statement<'a>> {
2015-02-23 00:46:11 +00:00
let stmt = self.cached_statements.get(query).cloned();
let CachedStatement { name, param_types, columns } = match stmt {
Some(stmt) => stmt,
None => {
let stmt_name = self.make_stmt_name();
let (param_types, columns) = try!(self.raw_prepare(&stmt_name, query));
let stmt = CachedStatement {
name: stmt_name,
param_types: param_types,
columns: columns,
};
self.cached_statements.insert(query.to_owned(), stmt.clone());
stmt
}
};
2015-08-16 04:39:46 +00:00
Ok(Statement::new(conn, name, param_types, columns, Cell::new(0), true))
}
fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> {
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[
2014-09-30 05:56:43 +00:00
Close {
variant: type_,
name: name,
2014-09-30 05:56:43 +00:00
},
Sync]));
let resp = match try!(self.read_message()) {
CloseComplete => Ok(()),
ErrorResponse { fields } => DbError::new(fields),
2014-11-29 01:35:37 +00:00
_ => bad_response!(self)
};
try!(self.wait_for_ready());
resp
2014-09-30 05:56:43 +00:00
}
2015-01-22 06:11:43 +00:00
fn get_type(&mut self, oid: Oid) -> Result<Type> {
2015-05-27 04:47:42 +00:00
if let Some(ty) = Type::new(oid) {
2015-01-22 06:11:43 +00:00
return Ok(ty);
2014-03-30 23:19:04 +00:00
}
2014-08-16 02:50:11 +00:00
2015-01-22 06:11:43 +00:00
if let Some(ty) = self.unknown_types.get(&oid) {
return Ok(ty.clone());
2013-12-04 08:18:28 +00:00
}
2015-01-22 06:11:43 +00:00
// Ew @ doing this manually :(
2015-02-15 23:11:15 +00:00
let mut buf = vec![];
let value = match try!(oid.to_sql_checked(&Type::Oid, &mut buf, &SessionInfo::new(self))) {
2015-02-15 23:11:15 +00:00
IsNull::Yes => None,
IsNull::No => Some(buf),
};
try!(self.write_messages(&[
Bind {
portal: "",
statement: TYPEINFO_QUERY,
formats: &[1],
2015-02-15 23:11:15 +00:00
values: &[value],
result_formats: &[1]
},
Execute {
portal: "",
max_rows: 0,
},
Sync]));
match try!(self.read_message()) {
BindComplete => {}
ErrorResponse { fields } => {
try!(self.wait_for_ready());
return DbError::new(fields);
}
_ => bad_response!(self)
}
let (name, elem_oid, rngsubtype): (String, Oid, Option<Oid>) =
match try!(self.read_message()) {
2015-01-22 06:11:43 +00:00
DataRow { row } => {
let ctx = SessionInfo::new(self);
(try!(FromSql::from_sql_nullable(&Type::Name,
row[0].as_ref().map(|r| &**r).as_mut(),
&ctx)),
try!(FromSql::from_sql_nullable(&Type::Oid,
row[1].as_ref().map(|r| &**r).as_mut(),
&ctx)),
try!(FromSql::from_sql_nullable(&Type::Oid,
row[2].as_ref().map(|r| &**r).as_mut(),
&ctx)))
2015-01-22 06:11:43 +00:00
}
ErrorResponse { fields } => {
try!(self.wait_for_ready());
return DbError::new(fields);
}
_ => bad_response!(self)
};
match try!(self.read_message()) {
CommandComplete { .. } => {}
ErrorResponse { fields } => {
try!(self.wait_for_ready());
return DbError::new(fields);
}
_ => bad_response!(self)
}
try!(self.wait_for_ready());
2015-01-22 06:11:43 +00:00
2015-02-14 20:46:18 +00:00
let kind = if elem_oid != 0 {
Kind::Array(try!(self.get_type(elem_oid)))
2015-01-22 06:11:43 +00:00
} else {
2015-02-14 20:46:18 +00:00
match rngsubtype {
Some(oid) => Kind::Range(try!(self.get_type(oid))),
None => Kind::Simple
}
2015-01-22 06:11:43 +00:00
};
2015-02-14 20:46:18 +00:00
let type_ = Type::Other(Box::new(Other::new(name, oid, kind)));
2015-01-22 06:11:43 +00:00
self.unknown_types.insert(oid, type_.clone());
Ok(type_)
2013-12-04 08:18:28 +00:00
}
2014-03-30 23:19:04 +00:00
fn is_desynchronized(&self) -> bool {
self.desynchronized
}
2014-11-01 23:12:05 +00:00
fn wait_for_ready(&mut self) -> Result<()> {
2014-11-17 06:54:57 +00:00
match try!(self.read_message()) {
ReadyForQuery { .. } => Ok(()),
2014-07-10 19:35:57 +00:00
_ => bad_response!(self)
}
}
2013-12-04 07:48:46 +00:00
2014-11-01 23:12:05 +00:00
fn quick_query(&mut self, query: &str) -> Result<Vec<Vec<Option<String>>>> {
check_desync!(self);
2015-03-31 03:18:47 +00:00
debug!("executing query: {}", query);
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[Query { query: query }]));
2013-12-04 07:48:46 +00:00
2014-05-26 18:41:18 +00:00
let mut result = vec![];
2013-12-04 07:48:46 +00:00
loop {
2014-11-17 06:54:57 +00:00
match try!(self.read_message()) {
2013-12-04 07:48:46 +00:00
ReadyForQuery { .. } => break,
2014-04-26 21:46:38 +00:00
DataRow { row } => {
2014-09-18 05:57:23 +00:00
result.push(row.into_iter().map(|opt| {
2015-02-03 07:41:03 +00:00
opt.map(|b| String::from_utf8_lossy(&b).into_owned())
2014-05-16 02:27:19 +00:00
}).collect());
2014-04-26 21:46:38 +00:00
}
2014-10-06 00:31:25 +00:00
CopyInResponse { .. } => {
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[
2014-10-06 00:31:25 +00:00
CopyFail {
message: "COPY queries cannot be directly executed",
},
Sync]));
}
ErrorResponse { fields } => {
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
return DbError::new(fields);
}
2013-12-04 07:48:46 +00:00
_ => {}
}
}
Ok(result)
2013-12-04 07:48:46 +00:00
}
2014-11-01 23:12:05 +00:00
fn finish_inner(&mut self) -> Result<()> {
check_desync!(self);
2014-11-19 17:58:30 +00:00
try!(self.write_messages(&[Terminate]));
2014-04-26 06:14:55 +00:00
Ok(())
}
}
2015-07-22 14:41:40 +00:00
fn _ensure_send() {
fn _is_send<T: Send>() {}
_is_send::<Connection>();
}
2013-09-30 02:12:20 +00:00
/// A connection to a Postgres database.
2014-11-01 23:21:47 +00:00
pub struct Connection {
conn: RefCell<InnerConnection>
}
2015-01-23 18:44:15 +00:00
impl fmt::Debug for Connection {
2015-01-10 04:48:47 +00:00
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let conn = self.conn.borrow();
2015-03-31 04:03:31 +00:00
DebugStruct::new(fmt, "Connection")
2015-03-14 18:11:44 +00:00
.field("cancel_data", &conn.cancel_data)
.field("notifications", &conn.notifications.len())
.field("transaction_depth", &conn.trans_depth)
.field("desynchronized", &conn.desynchronized)
.field("cached_statements", &conn.cached_statements.len())
.finish()
2015-01-10 04:48:47 +00:00
}
}
2014-11-01 23:21:47 +00:00
impl Connection {
/// Creates a new connection to a Postgres database.
2013-09-30 02:12:20 +00:00
///
/// Most applications can use a URL string in the normal format:
2013-09-30 02:12:20 +00:00
///
2014-03-09 06:01:24 +00:00
/// ```notrust
/// postgresql://user[:password]@host[:port][/database][?param1=val1[[&param2=val2]...]]
2013-09-30 02:12:20 +00:00
/// ```
///
/// The password may be omitted if not required. The default Postgres port
/// (5432) is used if none is specified. The database name defaults to the
/// username if not specified.
2014-03-09 22:22:20 +00:00
///
2015-05-27 05:07:13 +00:00
/// Connection via Unix sockets is supported with the `unix_socket`
2015-02-26 17:02:32 +00:00
/// feature. To connect to the server via Unix sockets, `host` should be
/// set to the absolute path of the directory containing the socket file.
/// Since `/` is a reserved character in URLs, the path should be URL
/// encoded. If the path contains non-UTF 8 characters, a `ConnectParams`
/// struct should be created manually and passed in. Note that Postgres
/// does not support SSL over Unix sockets.
///
2014-07-30 02:48:56 +00:00
/// ## Examples
2014-03-09 22:22:20 +00:00
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2015-05-26 05:47:25 +00:00
/// # use postgres::{Connection, SslMode};
/// # fn f() -> Result<(), ::postgres::error::ConnectError> {
/// let url = "postgresql://postgres:hunter2@localhost:2994/foodb";
2014-11-17 21:46:33 +00:00
/// let conn = try!(Connection::connect(url, &SslMode::None));
2014-08-20 04:49:27 +00:00
/// # Ok(()) };
2014-03-09 22:22:20 +00:00
/// ```
///
2014-04-19 18:10:27 +00:00
/// ```rust,no_run
2015-05-26 05:47:25 +00:00
/// # use postgres::{Connection, SslMode};
/// # fn f() -> Result<(), ::postgres::error::ConnectError> {
/// let url = "postgresql://postgres@%2Frun%2Fpostgres";
2014-11-17 21:46:33 +00:00
/// let conn = try!(Connection::connect(url, &SslMode::None));
2014-08-20 04:49:27 +00:00
/// # Ok(()) };
/// ```
///
/// ```rust,no_run
2015-05-26 05:47:25 +00:00
/// # use postgres::{Connection, UserInfo, ConnectParams, SslMode, ConnectTarget};
2015-02-26 17:02:32 +00:00
/// # #[cfg(feature = "unix_socket")]
2015-05-26 05:47:25 +00:00
/// # fn f() -> Result<(), ::postgres::error::ConnectError> {
/// # let some_crazy_path = Path::new("");
2014-11-01 23:15:30 +00:00
/// let params = ConnectParams {
/// target: ConnectTarget::Unix(some_crazy_path),
/// port: None,
2014-11-01 23:14:08 +00:00
/// user: Some(UserInfo {
2014-12-23 17:10:16 +00:00
/// user: "postgres".to_string(),
/// password: None
/// }),
/// database: None,
2014-05-26 18:41:18 +00:00
/// options: vec![],
/// };
2014-11-17 21:46:33 +00:00
/// let conn = try!(Connection::connect(params, &SslMode::None));
2014-08-20 04:49:27 +00:00
/// # Ok(()) };
/// ```
2014-11-04 06:24:11 +00:00
pub fn connect<T>(params: T, ssl: &SslMode) -> result::Result<Connection, ConnectError>
2014-08-17 20:57:58 +00:00
where T: IntoConnectParams {
2014-11-01 23:21:47 +00:00
InnerConnection::connect(params, ssl).map(|conn| {
Connection { conn: RefCell::new(conn) }
})
}
2013-09-30 02:12:20 +00:00
/// Sets the notice handler for the connection, returning the old handler.
2015-02-22 23:05:19 +00:00
pub fn set_notice_handler(&self, handler: Box<HandleNotice>) -> Box<HandleNotice> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().set_notice_handler(handler)
}
/// Returns an iterator over asynchronous notification messages.
///
/// Use the `LISTEN` command to register this connection for notifications.
2014-11-01 23:18:09 +00:00
pub fn notifications<'a>(&'a self) -> Notifications<'a> {
Notifications { conn: self }
}
/// Creates a new prepared statement.
2013-09-30 02:12:20 +00:00
///
/// A statement may contain parameters, specified by `$n` where `n` is the
/// index of the parameter in the list provided at execution time,
/// 1-indexed.
///
/// The statement is associated with the connection that created it and may
/// not outlive that connection.
2014-03-09 22:22:20 +00:00
///
2014-07-30 02:48:56 +00:00
/// ## Example
2014-03-09 22:22:20 +00:00
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-11-17 21:46:33 +00:00
/// # use postgres::{Connection, SslMode};
/// # let conn = Connection::connect("", &SslMode::None).unwrap();
/// let maybe_stmt = conn.prepare("SELECT foo FROM bar WHERE baz = $1");
2014-03-09 22:22:20 +00:00
/// let stmt = match maybe_stmt {
/// Ok(stmt) => stmt,
2015-01-09 19:20:43 +00:00
/// Err(err) => panic!("Error preparing statement: {:?}", err)
2014-03-09 22:22:20 +00:00
/// };
2014-11-01 23:24:24 +00:00
pub fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.conn.borrow_mut().prepare(query, self)
}
/// Creates cached prepared statement.
///
2015-01-23 06:55:18 +00:00
/// Like `prepare`, except that the statement is only prepared once over
/// the lifetime of the connection and then cached. If the same statement
/// is going to be used frequently, caching it can improve performance by
/// reducing the number of round trips to the Postgres backend.
///
/// ## Example
///
/// ```rust,no_run
/// # use postgres::{Connection, SslMode};
/// # fn f() -> postgres::Result<()> {
/// # let x = 10i32;
/// # let conn = Connection::connect("", &SslMode::None).unwrap();
/// let stmt = try!(conn.prepare_cached("SELECT foo FROM bar WHERE baz = $1"));
2015-02-20 08:04:14 +00:00
/// for row in try!(stmt.query(&[&x])) {
/// println!("foo: {}", row.get::<_, String>(0));
/// }
/// # Ok(()) };
/// ```
pub fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.conn.borrow_mut().prepare_cached(query, self)
}
/// Begins a new transaction.
2013-09-30 02:12:20 +00:00
///
2014-11-01 23:25:11 +00:00
/// Returns a `Transaction` object which should be used instead of
2013-10-14 01:58:31 +00:00
/// the connection for the duration of the transaction. The transaction
2014-11-01 23:25:11 +00:00
/// is active until the `Transaction` object falls out of scope.
2014-07-26 05:41:10 +00:00
///
2014-07-30 02:48:56 +00:00
/// ## Note
2014-10-06 03:49:09 +00:00
/// A transaction will roll back by default. The `set_commit`,
/// `set_rollback`, and `commit` methods alter this behavior.
2014-03-09 22:22:20 +00:00
///
/// ## Panics
///
/// Panics if a transaction is already active.
///
2014-07-30 02:48:56 +00:00
/// ## Example
2014-03-09 22:22:20 +00:00
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-11-17 21:46:33 +00:00
/// # use postgres::{Connection, SslMode};
2015-05-26 05:47:25 +00:00
/// # fn foo() -> Result<(), postgres::error::Error> {
2014-11-17 21:46:33 +00:00
/// # let conn = Connection::connect("", &SslMode::None).unwrap();
/// let trans = try!(conn.transaction());
/// try!(trans.execute("UPDATE foo SET bar = 10", &[]));
2014-07-26 05:41:10 +00:00
/// // ...
2014-03-09 22:22:20 +00:00
///
2014-10-06 03:47:48 +00:00
/// try!(trans.commit());
2014-03-09 22:22:20 +00:00
/// # Ok(())
/// # }
/// ```
2014-11-01 23:25:11 +00:00
pub fn transaction<'a>(&'a self) -> Result<Transaction<'a>> {
let mut conn = self.conn.borrow_mut();
check_desync!(conn);
2015-01-27 05:13:10 +00:00
assert!(conn.trans_depth == 0, "`transaction` must be called on the active transaction");
try!(conn.quick_query("BEGIN"));
conn.trans_depth += 1;
2014-11-01 23:25:11 +00:00
Ok(Transaction {
2013-08-27 05:40:23 +00:00
conn: self,
2014-07-26 05:41:10 +00:00
commit: Cell::new(false),
depth: 1,
finished: false,
})
}
/// Sets the isolation level which will be used for future transactions.
///
/// ## Note
///
/// This will not change the behavior of an active transaction.
pub fn set_transaction_isolation(&self, level: IsolationLevel) -> Result<()> {
self.batch_execute(level.to_set_query())
}
/// Returns the isolation level which will be used for future transactions.
pub fn transaction_isolation(&self) -> Result<IsolationLevel> {
let mut conn = self.conn.borrow_mut();
check_desync!(conn);
let result = try!(conn.quick_query("SHOW TRANSACTION ISOLATION LEVEL"));
IsolationLevel::parse(result[0][0].as_ref().unwrap())
}
/// A convenience function for queries that are only run once.
2013-09-30 02:12:20 +00:00
///
/// If an error is returned, it could have come from either the preparation
/// or execution of the statement.
///
/// On success, returns the number of rows modified or 0 if not applicable.
2015-02-01 18:49:52 +00:00
///
/// ## Panics
///
/// Panics if the number of parameters provided does not match the number
/// expected.
pub fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
let (param_types, columns) = try!(self.conn.borrow_mut().raw_prepare("", query));
2015-08-16 04:39:46 +00:00
let stmt = Statement::new(self, "".to_owned(), param_types, columns, Cell::new(0), true);
stmt.execute(params)
2013-09-01 18:06:33 +00:00
}
2014-06-08 23:07:15 +00:00
/// Execute a sequence of SQL statements.
///
/// Statements should be separated by `;` characters. If an error occurs,
/// execution of the sequence will stop at that point. This is intended for
/// execution of batches of non-dynamic statements - for example, creation
/// of a schema for a fresh database.
///
2014-07-30 02:48:56 +00:00
/// ## Warning
2014-06-08 23:07:15 +00:00
///
/// Prepared statements should be used for any SQL statement which contains
/// user-specified data, as it provides functionality to safely embed that
2015-01-24 18:46:01 +00:00
/// data in the statement. Do not form statements via string concatenation
2014-06-08 23:07:15 +00:00
/// and feed them into this method.
///
2014-07-30 02:48:56 +00:00
/// ## Example
2014-06-08 23:07:15 +00:00
///
2014-06-08 23:12:27 +00:00
/// ```rust,no_run
2014-11-01 23:21:47 +00:00
/// # use postgres::{Connection, Result};
/// fn init_db(conn: &Connection) -> Result<()> {
2014-07-26 00:56:43 +00:00
/// conn.batch_execute("
2014-06-08 23:07:15 +00:00
/// CREATE TABLE person (
/// id SERIAL PRIMARY KEY,
/// name NOT NULL
/// );
///
/// CREATE TABLE purchase (
/// id SERIAL PRIMARY KEY,
/// person INT NOT NULL REFERENCES person (id),
/// time TIMESTAMPTZ NOT NULL,
/// );
///
/// CREATE INDEX ON purchase (time);
2014-07-26 00:56:43 +00:00
/// ")
2014-06-08 23:07:15 +00:00
/// }
/// ```
2014-11-01 23:12:05 +00:00
pub fn batch_execute(&self, query: &str) -> Result<()> {
self.conn.borrow_mut().quick_query(query).map(|_| ())
2014-06-08 23:07:15 +00:00
}
2013-10-21 00:32:14 +00:00
/// Returns information used to cancel pending queries.
///
/// Used with the `cancel_query` function. The object returned can be used
/// to cancel any query executed by the connection it was created from.
2014-11-01 23:19:02 +00:00
pub fn cancel_data(&self) -> CancelData {
2014-03-19 04:00:06 +00:00
self.conn.borrow().cancel_data
2013-10-21 00:32:14 +00:00
}
/// Returns the value of the specified Postgres backend parameter, such as
/// `timezone` or `server_version`.
2015-02-16 04:22:56 +00:00
pub fn parameter(&self, param: &str) -> Option<String> {
self.conn.borrow().parameters.get(param).cloned()
}
/// Returns whether or not the stream has been desynchronized due to an
/// error in the communication channel with the server.
///
/// If this has occurred, all further queries will immediately return an
/// error.
pub fn is_desynchronized(&self) -> bool {
2014-03-19 04:00:06 +00:00
self.conn.borrow().is_desynchronized()
}
/// Determines if the `Connection` is currently "active", that is, if there
/// are no active transactions.
///
/// The `transaction` method can only be called on the active `Connection`
/// or `Transaction`.
pub fn is_active(&self) -> bool {
self.conn.borrow().trans_depth == 0
}
/// Consumes the connection, closing it.
///
2014-11-03 00:48:38 +00:00
/// Functionally equivalent to the `Drop` implementation for `Connection`
/// except that it returns any error encountered to the caller.
2014-11-01 23:12:05 +00:00
pub fn finish(self) -> Result<()> {
2014-03-19 04:00:06 +00:00
let mut conn = self.conn.borrow_mut();
conn.finished = true;
conn.finish_inner()
}
2013-08-17 22:09:26 +00:00
}
2014-05-19 02:46:21 +00:00
/// Represents a transaction on a database connection.
///
2014-07-26 05:41:10 +00:00
/// The transaction will roll back by default.
2014-11-01 23:25:11 +00:00
pub struct Transaction<'conn> {
2014-11-01 23:21:47 +00:00
conn: &'conn Connection,
depth: u32,
2015-01-29 05:08:20 +00:00
commit: Cell<bool>,
finished: bool,
}
2015-01-23 18:44:15 +00:00
impl<'a> fmt::Debug for Transaction<'a> {
2015-01-10 04:48:47 +00:00
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
2015-03-31 04:03:31 +00:00
DebugStruct::new(fmt, "Transaction")
2015-03-14 18:11:44 +00:00
.field("commit", &self.commit.get())
.field("depth", &self.depth)
.finish()
2015-01-10 04:48:47 +00:00
}
}
2014-11-01 23:25:11 +00:00
impl<'conn> Drop for Transaction<'conn> {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
}
}
2014-11-01 23:25:11 +00:00
impl<'conn> Transaction<'conn> {
2014-11-01 23:12:05 +00:00
fn finish_inner(&mut self) -> Result<()> {
let mut conn = self.conn.conn.borrow_mut();
debug_assert!(self.depth == conn.trans_depth);
2014-07-26 05:41:10 +00:00
let query = match (self.commit.get(), self.depth != 1) {
(false, true) => "ROLLBACK TO sp",
(false, false) => "ROLLBACK",
(true, true) => "RELEASE sp",
(true, false) => "COMMIT",
2014-05-18 18:37:52 +00:00
};
conn.trans_depth -= 1;
conn.quick_query(query).map(|_| ())
}
2013-08-27 05:40:23 +00:00
2014-11-01 23:21:47 +00:00
/// Like `Connection::prepare`.
pub fn prepare(&self, query: &str) -> Result<Statement<'conn>> {
self.conn.prepare(query)
}
/// Like `Connection::prepare_cached`.
///
/// Note that the statement will be cached for the duration of the
/// connection, not just the duration of this transaction.
pub fn prepare_cached(&self, query: &str) -> Result<Statement<'conn>> {
self.conn.prepare_cached(query)
2013-09-01 18:06:33 +00:00
}
2014-11-01 23:21:47 +00:00
/// Like `Connection::execute`.
pub fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
self.conn.execute(query, params)
2013-09-30 02:12:20 +00:00
}
2014-11-01 23:21:47 +00:00
/// Like `Connection::batch_execute`.
2014-11-01 23:12:05 +00:00
pub fn batch_execute(&self, query: &str) -> Result<()> {
self.conn.batch_execute(query)
2014-06-08 23:07:15 +00:00
}
2014-11-01 23:21:47 +00:00
/// Like `Connection::transaction`.
///
/// ## Panics
///
/// Panics if there is an active nested transaction.
2014-11-01 23:25:11 +00:00
pub fn transaction<'a>(&'a self) -> Result<Transaction<'a>> {
let mut conn = self.conn.conn.borrow_mut();
check_desync!(conn);
2015-05-02 20:40:06 +00:00
assert!(conn.trans_depth == self.depth,
"`transaction` may only be called on the active transaction");
try!(conn.quick_query("SAVEPOINT sp"));
conn.trans_depth += 1;
2014-11-01 23:25:11 +00:00
Ok(Transaction {
2013-09-05 06:28:44 +00:00
conn: self.conn,
2014-07-26 05:41:10 +00:00
commit: Cell::new(false),
depth: self.depth + 1,
finished: false,
})
}
2015-02-16 04:22:56 +00:00
/// Returns a reference to the `Transaction`'s `Connection`.
pub fn connection(&self) -> &'conn Connection {
self.conn
}
/// Like `Connection::is_active`.
pub fn is_active(&self) -> bool {
self.conn.conn.borrow().trans_depth == self.depth
}
2013-09-30 02:12:20 +00:00
/// Determines if the transaction is currently set to commit or roll back.
2013-08-27 05:40:23 +00:00
pub fn will_commit(&self) -> bool {
self.commit.get()
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to commit at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_commit(&self) {
self.commit.set(true);
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to roll back at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_rollback(&self) {
self.commit.set(false);
2013-08-27 05:40:23 +00:00
}
2014-07-26 05:41:10 +00:00
/// A convenience method which consumes and commits a transaction.
2014-11-01 23:12:05 +00:00
pub fn commit(self) -> Result<()> {
2014-07-26 05:41:10 +00:00
self.set_commit();
self.finish()
}
/// Consumes the transaction, commiting or rolling it back as appropriate.
///
2014-12-03 06:56:56 +00:00
/// Functionally equivalent to the `Drop` implementation of `Transaction`
/// except that it returns any error to the caller.
2014-11-01 23:12:05 +00:00
pub fn finish(mut self) -> Result<()> {
self.finished = true;
self.finish_inner()
}
2013-08-27 05:40:23 +00:00
}
fn read_rows(conn: &mut InnerConnection, buf: &mut VecDeque<Vec<Option<Vec<u8>>>>) -> Result<bool> {
let more_rows;
loop {
match try!(conn.read_message()) {
EmptyQueryResponse | CommandComplete { .. } => {
more_rows = false;
break;
}
PortalSuspended => {
more_rows = true;
break;
}
DataRow { row } => buf.push_back(row),
ErrorResponse { fields } => {
try!(conn.wait_for_ready());
return DbError::new(fields);
}
CopyInResponse { .. } => {
try!(conn.write_messages(&[
CopyFail {
message: "COPY queries cannot be directly executed",
},
Sync]));
}
2015-08-16 03:39:13 +00:00
CopyOutResponse { .. } => {
loop {
match try!(conn.read_message()) {
ReadyForQuery { .. } => break,
_ => {}
}
}
return Err(Error::IoError(std_io::Error::new(
std_io::ErrorKind::InvalidInput,
"COPY queries cannot be directly executed")));
}
_ => {
conn.desynchronized = true;
return Err(Error::IoError(bad_response()));
}
}
}
try!(conn.wait_for_ready());
Ok(more_rows)
}
/// A trait allowing abstraction over connections and transactions
pub trait GenericConnection {
2014-11-01 23:21:47 +00:00
/// Like `Connection::prepare`.
2014-11-01 23:24:24 +00:00
fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>>;
/// Like `Connection::prepare_cached`.
fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>>;
2014-11-01 23:21:47 +00:00
/// Like `Connection::execute`.
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64>;
2014-11-01 23:21:47 +00:00
/// Like `Connection::transaction`.
2014-11-01 23:25:11 +00:00
fn transaction<'a>(&'a self) -> Result<Transaction<'a>>;
2014-11-01 23:21:47 +00:00
/// Like `Connection::batch_execute`.
2014-11-01 23:12:05 +00:00
fn batch_execute(&self, query: &str) -> Result<()>;
/// Like `Connection::is_active`.
fn is_active(&self) -> bool;
}
2014-11-01 23:21:47 +00:00
impl GenericConnection for Connection {
2014-11-01 23:24:24 +00:00
fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.prepare(query)
}
fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.prepare_cached(query)
}
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
self.execute(query, params)
}
2014-11-01 23:25:11 +00:00
fn transaction<'a>(&'a self) -> Result<Transaction<'a>> {
self.transaction()
}
2014-11-01 23:12:05 +00:00
fn batch_execute(&self, query: &str) -> Result<()> {
self.batch_execute(query)
}
fn is_active(&self) -> bool {
self.is_active()
}
}
2014-11-01 23:25:11 +00:00
impl<'a> GenericConnection for Transaction<'a> {
fn prepare<'b>(&'b self, query: &str) -> Result<Statement<'b>> {
self.prepare(query)
}
fn prepare_cached<'b>(&'b self, query: &str) -> Result<Statement<'b>> {
self.prepare_cached(query)
}
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
self.execute(query, params)
}
fn transaction<'b>(&'b self) -> Result<Transaction<'b>> {
self.transaction()
}
2014-11-01 23:12:05 +00:00
fn batch_execute(&self, query: &str) -> Result<()> {
self.batch_execute(query)
}
fn is_active(&self) -> bool {
self.is_active()
}
}
trait OtherNew {
fn new(name: String, oid: Oid, kind: Kind) -> Other;
}
trait DbErrorNew {
fn new_raw(fields: Vec<(u8, String)>) -> result::Result<DbError, ()>;
fn new_connect<T>(fields: Vec<(u8, String)>) -> result::Result<T, ConnectError>;
fn new<T>(fields: Vec<(u8, String)>) -> Result<T>;
}
2015-05-27 04:47:42 +00:00
trait TypeNew {
fn new(oid: Oid) -> Option<Type>;
}
2015-05-30 05:54:10 +00:00
trait RowsNew<'a> {
fn new(stmt: &'a Statement<'a>, data: Vec<Vec<Option<Vec<u8>>>>) -> Rows<'a>;
}
trait LazyRowsNew<'trans, 'stmt> {
fn new(stmt: &'stmt Statement<'stmt>,
data: VecDeque<Vec<Option<Vec<u8>>>>,
name: String,
row_limit: i32,
more_rows: bool,
finished: bool,
trans: &'trans Transaction<'trans>) -> LazyRows<'trans, 'stmt>;
}
2015-06-02 05:39:04 +00:00
trait SessionInfoNew<'a> {
fn new(conn: &'a InnerConnection) -> SessionInfo<'a>;
}
2015-08-16 04:39:46 +00:00
trait StatementInternals<'conn> {
fn new(conn: &'conn Connection,
name: String,
param_types: Vec<Type>,
columns: Vec<Column>,
next_portal_id: Cell<u32>,
finished: bool) -> Statement<'conn>;
fn conn(&self) -> &'conn Connection;
}
trait ColumnNew {
fn new(name: String, type_: Type) -> Column;
}