rust-postgres/src/lib.rs

1547 lines
48 KiB
Rust
Raw Normal View History

2013-09-30 05:03:11 +00:00
/*!
Rust-Postgres is a pure-Rust frontend for the popular PostgreSQL database. It
exposes a high level interface in the vein of JDBC or Go's `database/sql`
package.
2014-03-10 17:06:40 +00:00
```rust,no_run
2014-02-15 23:03:35 +00:00
extern crate postgres;
2014-03-09 06:01:24 +00:00
extern crate time;
2013-10-01 07:01:54 +00:00
2014-03-09 06:01:24 +00:00
use time::Timespec;
2013-09-30 05:03:11 +00:00
use postgres::{PostgresConnection, NoSsl};
2013-09-30 05:03:11 +00:00
use postgres::types::ToSql;
struct Person {
id: i32,
2014-05-26 03:38:40 +00:00
name: String,
2013-10-01 07:01:54 +00:00
time_created: Timespec,
2014-04-08 03:02:05 +00:00
data: Option<Vec<u8>>
2013-09-30 05:03:11 +00:00
}
fn main() {
2014-04-21 05:53:54 +00:00
let conn = PostgresConnection::connect("postgresql://postgres@localhost",
&NoSsl).unwrap();
2013-09-30 05:03:11 +00:00
conn.execute("CREATE TABLE person (
2013-10-01 07:01:54 +00:00
id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
time_created TIMESTAMP NOT NULL,
data BYTEA
)", []).unwrap();
2013-09-30 05:03:11 +00:00
let me = Person {
id: 0,
2014-06-06 06:32:49 +00:00
name: "Steven".to_str(),
2013-10-01 07:01:54 +00:00
time_created: time::get_time(),
2013-09-30 05:03:11 +00:00
data: None
};
conn.execute("INSERT INTO person (name, time_created, data)
2013-09-30 05:03:11 +00:00
VALUES ($1, $2, $3)",
2014-05-26 22:21:15 +00:00
[&me.name as &ToSql, &me.time_created as &ToSql,
&me.data as &ToSql]).unwrap();
2013-09-30 05:03:11 +00:00
let stmt = conn.prepare("SELECT id, name, time_created, data FROM person")
.unwrap();
for row in stmt.query([]).unwrap() {
2013-09-30 05:03:11 +00:00
let person = Person {
2013-12-03 03:53:40 +00:00
id: row[1],
name: row[2],
time_created: row[3],
data: row[4]
2013-09-30 05:03:11 +00:00
};
2013-10-01 07:01:54 +00:00
println!("Found person {}", person.name);
2013-09-30 05:03:11 +00:00
}
}
```
*/
2014-03-29 21:33:11 +00:00
#![crate_id="github.com/sfackler/rust-postgres#postgres:0.0"]
#![crate_type="rlib"]
#![crate_type="dylib"]
2014-04-28 06:16:59 +00:00
#![doc(html_root_url="http://sfackler.github.io/rust-postgres/doc")]
2013-10-04 06:21:27 +00:00
2014-03-29 21:33:11 +00:00
#![warn(missing_doc)]
2013-09-30 02:47:30 +00:00
2014-03-29 21:33:11 +00:00
#![feature(macro_rules, struct_variant, phase)]
2013-10-08 04:11:54 +00:00
2014-02-15 23:03:35 +00:00
extern crate collections;
extern crate openssl;
extern crate serialize;
extern crate sync;
2014-02-22 07:18:39 +00:00
extern crate time;
extern crate phf;
2014-04-23 05:10:06 +00:00
#[phase(syntax)]
extern crate phf_mac;
2014-03-16 04:35:44 +00:00
extern crate url;
2014-03-18 04:05:04 +00:00
#[phase(syntax, link)]
extern crate log;
extern crate uuid;
2013-08-04 02:17:32 +00:00
2014-06-06 03:50:23 +00:00
use collections::{Deque, RingBuf};
2014-03-16 04:35:44 +00:00
use url::{UserInfo, Url};
2014-01-03 01:13:52 +00:00
use openssl::crypto::hash::{MD5, Hasher};
use openssl::ssl::SslContext;
2014-02-14 03:26:52 +00:00
use serialize::hex::ToHex;
2014-02-08 02:17:40 +00:00
use std::cell::{Cell, RefCell};
2014-06-06 03:50:23 +00:00
use std::collections::HashMap;
2014-02-26 08:40:57 +00:00
use std::from_str::FromStr;
use std::io::{BufferedStream, IoResult};
use std::io::net::ip::Port;
2014-02-12 07:42:46 +00:00
use std::mem;
use std::task;
use std::fmt;
2013-08-04 02:17:32 +00:00
use error::{InvalidUrl,
2014-02-12 07:42:46 +00:00
MissingPassword,
MissingUser,
PgConnectDbError,
PgConnectStreamError,
2014-02-12 07:42:46 +00:00
PgDbError,
PgInvalidColumn,
2014-02-12 07:42:46 +00:00
PgStreamDesynchronized,
PgStreamError,
2014-03-28 04:39:03 +00:00
PgWrongParamCount,
PostgresConnectError,
2014-02-12 07:42:46 +00:00
PostgresDbError,
PostgresError,
UnsupportedAuthentication,
PgWrongConnection,
PgWrongTransaction};
2014-05-28 04:07:58 +00:00
use io::{MaybeSslStream, InternalStream};
2014-02-12 07:42:46 +00:00
use message::{AuthenticationCleartextPassword,
AuthenticationGSS,
AuthenticationKerberosV5,
AuthenticationMD5Password,
2014-02-12 07:42:46 +00:00
AuthenticationOk,
AuthenticationSCMCredential,
AuthenticationSSPI,
BackendKeyData,
2014-02-12 07:42:46 +00:00
BackendMessage,
BindComplete,
CommandComplete,
DataRow,
EmptyQueryResponse,
ErrorResponse,
NoData,
NoticeResponse,
NotificationResponse,
ParameterDescription,
ParameterStatus,
ParseComplete,
PortalSuspended,
ReadyForQuery,
RowDescription,
RowDescriptionEntry};
use message::{Bind,
CancelRequest,
Close,
Describe,
Execute,
2014-02-12 07:42:46 +00:00
FrontendMessage,
Parse,
PasswordMessage,
Query,
2014-02-12 07:42:46 +00:00
StartupMessage,
Sync,
Terminate};
use message::{WriteMessage, ReadMessage};
use types::{Oid, PostgresType, ToSql, FromSql, PgUnknownType};
2013-07-25 07:10:18 +00:00
2014-03-24 00:34:50 +00:00
macro_rules! try_pg_conn(
2014-02-07 06:59:33 +00:00
($e:expr) => (
match $e {
Ok(ok) => ok,
Err(err) => return Err(PgConnectStreamError(err))
}
)
)
2014-03-24 00:34:50 +00:00
macro_rules! try_pg(
2014-02-07 06:59:33 +00:00
($e:expr) => (
match $e {
Ok(ok) => ok,
Err(err) => return Err(PgStreamError(err))
}
)
)
2014-03-24 00:34:50 +00:00
macro_rules! try_desync(
($e:expr) => (
match $e {
Ok(ok) => ok,
Err(err) => {
self.desynchronized = true;
return Err(err);
}
}
)
)
macro_rules! check_desync(
($e:expr) => ({
if $e.canary() != CANARY {
fail!("PostgresConnection use after free. See mozilla/rust#13246.");
}
if $e.is_desynchronized() {
return Err(PgStreamDesynchronized);
}
})
)
pub mod error;
mod io;
pub mod pool;
mod message;
pub mod types;
#[cfg(test)]
mod test;
static CANARY: u32 = 0xdeadbeef;
2013-10-21 00:32:14 +00:00
2014-04-03 04:26:41 +00:00
/// A typedef of the result returned by many methods.
pub type PostgresResult<T> = Result<T, PostgresError>;
/// Specifies the target server to connect to.
#[deriving(Clone)]
pub enum PostgresConnectTarget {
/// Connect via TCP to the specified host.
2014-05-26 03:38:40 +00:00
TargetTcp(String),
/// Connect via a Unix domain socket in the specified directory.
TargetUnix(Path)
}
/// Information necessary to open a new connection to a Postgres server.
#[deriving(Clone)]
pub struct PostgresConnectParams {
/// The target server
pub target: PostgresConnectTarget,
/// The target port.
///
/// Defaults to 5432 if not specified.
pub port: Option<Port>,
/// The user to login as.
///
/// `PostgresConnection::connect` requires a user but `cancel_query` does
/// not.
2014-05-26 03:38:40 +00:00
pub user: Option<String>,
/// An optional password used for authentication
2014-05-26 03:38:40 +00:00
pub password: Option<String>,
/// The database to connect to. Defaults the value of `user`.
2014-05-26 03:38:40 +00:00
pub database: Option<String>,
/// Runtime parameters to be passed to the Postgres backend.
2014-05-26 03:38:40 +00:00
pub options: Vec<(String, String)>,
}
/// A trait implemented by types that can be converted into a
/// `PostgresConnectParams`.
pub trait IntoConnectParams {
/// Converts the value of `self` into a `PostgresConnectParams`.
fn into_connect_params(self) -> Result<PostgresConnectParams,
PostgresConnectError>;
}
impl IntoConnectParams for PostgresConnectParams {
fn into_connect_params(self) -> Result<PostgresConnectParams,
PostgresConnectError> {
Ok(self)
}
}
impl<'a> IntoConnectParams for &'a str {
2014-05-17 03:47:03 +00:00
fn into_connect_params(self) -> Result<PostgresConnectParams,
PostgresConnectError> {
match url::from_str(self) {
Ok(url) => url.into_connect_params(),
Err(err) => return Err(InvalidUrl(err)),
}
}
}
impl IntoConnectParams for Url {
fn into_connect_params(self) -> Result<PostgresConnectParams,
PostgresConnectError> {
let Url {
host,
port,
user,
path,
query: options,
..
2014-05-17 03:47:03 +00:00
} = self;
2014-05-16 02:11:23 +00:00
let maybe_path = url::decode_component(host.as_slice());
let target = if maybe_path.as_slice().starts_with("/") {
TargetUnix(Path::new(maybe_path))
} else {
TargetTcp(host)
};
let (user, pass) = match user {
Some(UserInfo { user, pass }) => (Some(user), pass),
None => (None, None),
};
let port = match port {
2014-05-16 02:11:23 +00:00
Some(port) => match FromStr::from_str(port.as_slice()) {
Some(port) => Some(port),
2014-06-06 06:32:49 +00:00
None => return Err(InvalidUrl("invalid port".to_str())),
},
None => None,
};
let database = if !path.is_empty() {
// path contains the leading /
2014-05-16 02:11:23 +00:00
let (_, path) = path.as_slice().slice_shift_char();
2014-06-06 06:32:49 +00:00
Some(path.to_str())
} else {
None
};
Ok(PostgresConnectParams {
target: target,
port: port,
user: user,
password: pass,
database: database,
options: options,
})
}
}
2013-09-30 02:12:20 +00:00
/// Trait for types that can handle Postgres notice messages
pub trait PostgresNoticeHandler {
2013-09-30 02:12:20 +00:00
/// Handle a Postgres notice message
fn handle(&mut self, notice: PostgresDbError);
}
2013-09-30 05:22:10 +00:00
/// A notice handler which logs at the `info` level.
2013-09-30 02:12:20 +00:00
///
/// This is the default handler used by a `PostgresConnection`.
pub struct DefaultNoticeHandler;
impl PostgresNoticeHandler for DefaultNoticeHandler {
fn handle(&mut self, notice: PostgresDbError) {
2013-10-21 04:06:12 +00:00
info!("{}: {}", notice.severity, notice.message);
}
}
/// An asynchronous notification
2014-05-29 04:54:02 +00:00
#[deriving(Show)]
pub struct PostgresNotification {
/// The process ID of the notifying backend process
2014-04-03 05:56:16 +00:00
pub pid: i32,
/// The name of the channel that the notify has been raised on
2014-05-26 03:38:40 +00:00
pub channel: String,
/// The "payload" string passed from the notifying process
2014-05-26 03:38:40 +00:00
pub payload: String,
}
/// An iterator over asynchronous notifications
2014-02-03 05:09:48 +00:00
pub struct PostgresNotifications<'conn> {
conn: &'conn PostgresConnection
}
2014-03-26 04:50:59 +00:00
impl<'conn> Iterator<PostgresNotification> for PostgresNotifications<'conn> {
/// Returns the oldest pending notification or `None` if there are none.
///
/// # Note
///
/// `next` may return `Some` notification after returning `None` if a new
/// notification was received.
fn next(&mut self) -> Option<PostgresNotification> {
2014-03-23 04:20:22 +00:00
self.conn.conn.borrow_mut().notifications.pop_front()
}
}
2013-10-21 00:32:14 +00:00
/// Contains information necessary to cancel queries for a session
pub struct PostgresCancelData {
/// The process ID of the session
2014-04-03 05:56:16 +00:00
pub process_id: i32,
2013-10-21 00:32:14 +00:00
/// The secret key for the session
2014-04-03 05:56:16 +00:00
pub secret_key: i32,
2013-10-21 00:32:14 +00:00
}
/// Attempts to cancel an in-progress query.
///
/// The backend provides no information about whether a cancellation attempt
/// was successful or not. An error will only be returned if the driver was
/// unable to connect to the database.
///
/// A `PostgresCancelData` object can be created via
/// `PostgresConnection::cancel_data`. The object can cancel any query made on
/// that connection.
2014-03-09 22:22:20 +00:00
///
2014-04-24 05:54:09 +00:00
/// Only the host and port of the connetion info are used. See
/// `PostgresConnection::connect` for details of the `params` argument.
2014-03-31 02:21:51 +00:00
///
2014-03-09 22:22:20 +00:00
/// # Example
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-03-09 22:22:20 +00:00
/// # use postgres::{PostgresConnection, NoSsl};
/// # let url = "";
/// let conn = PostgresConnection::connect(url, &NoSsl).unwrap();
2014-03-09 22:22:20 +00:00
/// let cancel_data = conn.cancel_data();
/// spawn(proc() {
/// conn.execute("SOME EXPENSIVE QUERY", []).unwrap();
2014-03-09 22:22:20 +00:00
/// });
/// # let _ =
/// postgres::cancel_query(url, &NoSsl, cancel_data);
/// ```
pub fn cancel_query<T: IntoConnectParams>(params: T, ssl: &SslMode,
data: PostgresCancelData)
-> Result<(), PostgresConnectError> {
let params = try!(params.into_connect_params());
2013-10-21 00:32:14 +00:00
let mut socket = match io::initialize_stream(&params, ssl) {
2013-10-21 00:32:14 +00:00
Ok(socket) => socket,
2013-12-06 07:20:50 +00:00
Err(err) => return Err(err)
2013-10-21 00:32:14 +00:00
};
2014-03-24 00:34:50 +00:00
try_pg_conn!(socket.write_message(&CancelRequest {
2013-10-21 00:32:14 +00:00
code: message::CANCEL_CODE,
process_id: data.process_id,
secret_key: data.secret_key
2014-02-07 06:59:33 +00:00
}));
2014-03-24 00:34:50 +00:00
try_pg_conn!(socket.flush());
2013-10-21 00:32:14 +00:00
2013-12-06 07:20:50 +00:00
Ok(())
2013-10-21 00:32:14 +00:00
}
struct InnerPostgresConnection {
stream: BufferedStream<MaybeSslStream<InternalStream>>,
2013-12-06 07:20:50 +00:00
next_stmt_id: uint,
2014-05-04 05:38:50 +00:00
notice_handler: Box<PostgresNoticeHandler:Send>,
notifications: RingBuf<PostgresNotification>,
2013-12-04 08:18:28 +00:00
cancel_data: PostgresCancelData,
2014-05-26 03:38:40 +00:00
unknown_types: HashMap<Oid, String>,
desynchronized: bool,
finished: bool,
trans_depth: u32,
canary: u32,
2013-08-29 06:19:53 +00:00
}
impl Drop for InnerPostgresConnection {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
2013-08-29 06:19:53 +00:00
}
}
impl InnerPostgresConnection {
fn connect<T: IntoConnectParams>(params: T, ssl: &SslMode)
-> Result<InnerPostgresConnection,
PostgresConnectError> {
let params = try!(params.into_connect_params());
let stream = try!(io::initialize_stream(&params, ssl));
let PostgresConnectParams {
user,
password,
database,
mut options,
..
} = params;
let user = match user {
Some(user) => user,
None => return Err(MissingUser),
};
2014-05-28 04:07:58 +00:00
let mut conn = InnerPostgresConnection {
stream: BufferedStream::new(stream),
next_stmt_id: 0,
notice_handler: box DefaultNoticeHandler,
notifications: RingBuf::new(),
cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 },
unknown_types: HashMap::new(),
desynchronized: false,
finished: false,
trans_depth: 0,
canary: CANARY,
};
2014-06-06 06:32:49 +00:00
options.push(("client_encoding".to_str(), "UTF8".to_str()));
2013-09-09 04:33:41 +00:00
// Postgres uses the value of TimeZone as the time zone for TIMESTAMP
// WITH TIME ZONE values. Timespec converts to GMT internally.
2014-06-06 06:32:49 +00:00
options.push(("TimeZone".to_str(), "GMT".to_str()));
// We have to clone here since we need the user again for auth
2014-06-06 06:32:49 +00:00
options.push(("user".to_str(), user.clone()));
match database {
2014-06-06 06:32:49 +00:00
Some(database) => options.push(("database".to_str(), database)),
None => {}
}
2014-03-24 00:34:50 +00:00
try_pg_conn!(conn.write_messages([StartupMessage {
2013-09-09 05:40:08 +00:00
version: message::PROTOCOL_VERSION,
parameters: options.as_slice()
}]));
try!(conn.handle_auth(user, password));
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
loop {
2014-03-24 00:34:50 +00:00
match try_pg_conn!(conn.read_message()) {
2013-10-21 00:32:14 +00:00
BackendKeyData { process_id, secret_key } => {
conn.cancel_data.process_id = process_id;
conn.cancel_data.secret_key = secret_key;
}
ReadyForQuery { .. } => break,
2013-10-05 03:26:52 +00:00
ErrorResponse { fields } =>
return Err(PgConnectDbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-05 00:48:48 +00:00
}
2013-08-23 05:24:14 +00:00
Ok(conn)
2013-08-05 00:48:48 +00:00
}
fn write_messages(&mut self, messages: &[FrontendMessage])
-> IoResult<()> {
assert!(!self.desynchronized);
2013-11-10 04:58:38 +00:00
for message in messages.iter() {
2014-03-24 00:34:50 +00:00
try_desync!(self.stream.write_message(message));
2013-07-25 07:10:18 +00:00
}
2014-03-24 00:34:50 +00:00
Ok(try_desync!(self.stream.flush()))
2013-08-22 06:41:26 +00:00
}
2013-08-04 02:17:32 +00:00
fn read_message(&mut self) -> IoResult<BackendMessage> {
assert!(!self.desynchronized);
loop {
2014-03-24 00:34:50 +00:00
match try_desync!(self.stream.read_message()) {
NoticeResponse { fields } =>
self.notice_handler.handle(PostgresDbError::new(fields)),
NotificationResponse { pid, channel, payload } =>
self.notifications.push_back(PostgresNotification {
pid: pid,
channel: channel,
payload: payload
}),
ParameterStatus { parameter, value } =>
2014-03-19 04:00:06 +00:00
debug!("Parameter {} = {}", parameter, value),
val => return Ok(val)
}
2013-08-04 05:21:16 +00:00
}
2013-08-22 06:41:26 +00:00
}
2014-05-26 03:38:40 +00:00
fn handle_auth(&mut self, user: String, pass: Option<String>)
2014-04-03 04:26:41 +00:00
-> Result<(), PostgresConnectError> {
2014-03-24 00:34:50 +00:00
match try_pg_conn!(self.read_message()) {
AuthenticationOk => return Ok(()),
AuthenticationCleartextPassword => {
let pass = match pass {
Some(pass) => pass,
None => return Err(MissingPassword)
};
try_pg_conn!(self.write_messages([PasswordMessage {
2014-05-16 02:11:23 +00:00
password: pass.as_slice(),
}]));
}
AuthenticationMD5Password { salt } => {
let pass = match pass {
Some(pass) => pass,
None => return Err(MissingPassword)
};
2014-01-03 01:13:52 +00:00
let hasher = Hasher::new(MD5);
hasher.update(pass.as_bytes());
hasher.update(user.as_bytes());
2014-04-19 07:01:20 +00:00
let output = hasher.final().as_slice().to_hex();
2014-01-03 01:13:52 +00:00
let hasher = Hasher::new(MD5);
hasher.update(output.as_bytes());
hasher.update(salt);
2014-05-24 03:08:14 +00:00
let output = format!("md5{}",
hasher.final().as_slice().to_hex());
2014-03-24 00:34:50 +00:00
try_pg_conn!(self.write_messages([PasswordMessage {
password: output.as_slice()
}]));
}
AuthenticationKerberosV5
2013-09-12 05:02:32 +00:00
| AuthenticationSCMCredential
| AuthenticationGSS
| AuthenticationSSPI => return Err(UnsupportedAuthentication),
2013-10-05 02:26:57 +00:00
ErrorResponse { fields } =>
return Err(PgConnectDbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2014-03-24 00:34:50 +00:00
match try_pg_conn!(self.read_message()) {
AuthenticationOk => Ok(()),
ErrorResponse { fields } =>
Err(PgConnectDbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
}
2014-05-04 05:38:50 +00:00
fn set_notice_handler(&mut self, handler: Box<PostgresNoticeHandler:Send>)
-> Box<PostgresNoticeHandler:Send> {
2014-02-11 03:20:53 +00:00
mem::replace(&mut self.notice_handler, handler)
}
fn prepare<'a>(&mut self, query: &str, conn: &'a PostgresConnection)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresStatement<'a>> {
2014-05-24 03:08:14 +00:00
let stmt_name = format!("s{}", self.next_stmt_id);
self.next_stmt_id += 1;
2013-08-05 00:48:48 +00:00
2014-03-24 00:34:50 +00:00
try_pg!(self.write_messages([
2013-11-10 04:58:38 +00:00
Parse {
2014-05-16 02:59:01 +00:00
name: stmt_name.as_slice(),
2013-09-02 17:27:09 +00:00
query: query,
2014-03-30 23:19:04 +00:00
param_types: []
2013-09-02 17:27:09 +00:00
},
2013-11-10 04:58:38 +00:00
Describe {
2013-09-02 17:27:09 +00:00
variant: 'S' as u8,
2014-05-16 02:59:01 +00:00
name: stmt_name.as_slice(),
2013-09-02 17:27:09 +00:00
},
Sync]));
2013-08-22 06:41:26 +00:00
2014-03-24 00:34:50 +00:00
match try_pg!(self.read_message()) {
2013-10-25 05:30:34 +00:00
ParseComplete => {}
ErrorResponse { fields } => {
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
return Err(PgDbError(PostgresDbError::new(fields)));
}
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-22 06:41:26 +00:00
2014-03-24 00:34:50 +00:00
let mut param_types: Vec<PostgresType> = match try_pg!(self.read_message()) {
ParameterDescription { types } =>
2013-12-07 23:39:44 +00:00
types.iter().map(|ty| PostgresType::from_oid(*ty)).collect(),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
};
2013-08-22 06:41:26 +00:00
2014-03-24 00:34:50 +00:00
let mut result_desc: Vec<ResultDescription> = match try_pg!(self.read_message()) {
RowDescription { descriptions } =>
2013-12-03 07:04:44 +00:00
descriptions.move_iter().map(|desc| {
2014-04-03 05:56:16 +00:00
let RowDescriptionEntry { name, type_oid, .. } = desc;
ResultDescription {
name: name,
ty: PostgresType::from_oid(type_oid)
}
}).collect(),
2014-05-26 18:41:18 +00:00
NoData => vec![],
2013-10-25 05:30:34 +00:00
_ => unreachable!()
};
2013-08-17 22:09:26 +00:00
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
2013-08-22 06:41:26 +00:00
2013-12-04 08:18:28 +00:00
// now that the connection is ready again, get unknown type names
2014-03-30 23:19:04 +00:00
try!(self.set_type_names(param_types.mut_iter()));
try!(self.set_type_names(result_desc.mut_iter().map(|d| &mut d.ty)));
2013-12-04 08:18:28 +00:00
Ok(PostgresStatement {
conn: conn,
name: stmt_name,
param_types: param_types,
result_desc: result_desc,
next_portal_id: Cell::new(0),
2014-04-26 06:14:55 +00:00
finished: false,
})
2013-08-22 06:41:26 +00:00
}
2014-03-30 23:19:04 +00:00
fn set_type_names<'a, I: Iterator<&'a mut PostgresType>>(&mut self, mut it: I)
2014-04-03 04:26:41 +00:00
-> PostgresResult<()> {
2014-03-30 23:19:04 +00:00
for ty in it {
match *ty {
PgUnknownType { oid, ref mut name } =>
*name = try!(self.get_type_name(oid)),
_ => {}
}
}
Ok(())
}
2014-05-26 03:38:40 +00:00
fn get_type_name(&mut self, oid: Oid) -> PostgresResult<String> {
2013-12-04 08:18:28 +00:00
match self.unknown_types.find(&oid) {
Some(name) => return Ok(name.clone()),
2013-12-04 08:18:28 +00:00
None => {}
}
2014-04-03 05:56:16 +00:00
let name = try!(self.quick_query(format!("SELECT typname FROM pg_type \
2014-05-24 03:08:14 +00:00
WHERE oid={}", oid).as_slice()))
2014-03-15 05:33:24 +00:00
.move_iter().next().unwrap().move_iter().next().unwrap().unwrap();
2013-12-04 08:18:28 +00:00
self.unknown_types.insert(oid, name.clone());
Ok(name)
2013-12-04 08:18:28 +00:00
}
2014-03-30 23:19:04 +00:00
fn is_desynchronized(&self) -> bool {
self.desynchronized
}
fn canary(&self) -> u32 {
self.canary
}
2014-04-03 04:26:41 +00:00
fn wait_for_ready(&mut self) -> PostgresResult<()> {
2014-03-24 00:34:50 +00:00
match try_pg!(self.read_message()) {
ReadyForQuery { .. } => Ok(()),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
}
2013-12-04 07:48:46 +00:00
fn quick_query(&mut self, query: &str)
2014-05-26 03:38:40 +00:00
-> PostgresResult<Vec<Vec<Option<String>>>> {
check_desync!(self);
2014-03-24 00:34:50 +00:00
try_pg!(self.write_messages([Query { query: query }]));
2013-12-04 07:48:46 +00:00
2014-05-26 18:41:18 +00:00
let mut result = vec![];
2013-12-04 07:48:46 +00:00
loop {
2014-03-24 00:34:50 +00:00
match try_pg!(self.read_message()) {
2013-12-04 07:48:46 +00:00
ReadyForQuery { .. } => break,
2014-04-26 21:46:38 +00:00
DataRow { row } => {
result.push(row.move_iter().map(|opt| {
2014-05-26 03:38:40 +00:00
opt.map(|b| String::from_utf8(b).unwrap())
2014-05-16 02:27:19 +00:00
}).collect());
2014-04-26 21:46:38 +00:00
}
ErrorResponse { fields } => {
2014-02-22 07:18:39 +00:00
try!(self.wait_for_ready());
return Err(PgDbError(PostgresDbError::new(fields)));
}
2013-12-04 07:48:46 +00:00
_ => {}
}
}
Ok(result)
2013-12-04 07:48:46 +00:00
}
2014-04-03 04:26:41 +00:00
fn finish_inner(&mut self) -> PostgresResult<()> {
check_desync!(self);
self.canary = 0;
2014-04-26 06:14:55 +00:00
try_pg!(self.write_messages([Terminate]));
Ok(())
}
}
2013-09-30 02:12:20 +00:00
/// A connection to a Postgres database.
pub struct PostgresConnection {
conn: RefCell<InnerPostgresConnection>
}
impl PostgresConnection {
/// Creates a new connection to a Postgres database.
2013-09-30 02:12:20 +00:00
///
/// Most applications can use a URL string in the normal format:
2013-09-30 02:12:20 +00:00
///
2014-03-09 06:01:24 +00:00
/// ```notrust
/// postgresql://user[:password]@host[:port][/database][?param1=val1[[&param2=val2]...]]
2013-09-30 02:12:20 +00:00
/// ```
///
/// The password may be omitted if not required. The default Postgres port
/// (5432) is used if none is specified. The database name defaults to the
/// username if not specified.
2014-03-09 22:22:20 +00:00
///
/// To connect to the server via Unix sockets, `host` should be set to the
2014-04-21 05:53:54 +00:00
/// absolute path of the directory containing the socket file. Since `/` is
/// a reserved character in URLs, the path should be URL encoded. If the
/// path contains non-UTF 8 characters, a `PostgresConnectParams` struct
2014-04-24 05:54:09 +00:00
/// should be created manually and passed in. Note that Postgres does not
/// support SSL over Unix sockets.
///
/// # Examples
2014-03-09 22:22:20 +00:00
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-03-09 22:22:20 +00:00
/// # use postgres::{PostgresConnection, NoSsl};
/// let url = "postgresql://postgres:hunter2@localhost:2994/foodb";
/// let maybe_conn = PostgresConnection::connect(url, &NoSsl);
2014-03-09 22:22:20 +00:00
/// let conn = match maybe_conn {
/// Ok(conn) => conn,
/// Err(err) => fail!("Error connecting: {}", err)
/// };
/// ```
///
2014-04-19 18:10:27 +00:00
/// ```rust,no_run
/// # use postgres::{PostgresConnection, NoSsl};
/// let url = "postgresql://postgres@%2Frun%2Fpostgres";
/// let maybe_conn = PostgresConnection::connect(url, &NoSsl);
/// ```
///
/// ```rust,no_run
/// # use postgres::{PostgresConnection, PostgresConnectParams, NoSsl, TargetUnix};
/// # let some_crazy_path = Path::new("");
/// let params = PostgresConnectParams {
/// target: TargetUnix(some_crazy_path),
/// port: None,
2014-06-06 06:32:49 +00:00
/// user: Some("postgres".to_str()),
/// password: None,
/// database: None,
2014-05-26 18:41:18 +00:00
/// options: vec![],
/// };
/// let maybe_conn = PostgresConnection::connect(params, &NoSsl);
/// ```
pub fn connect<T: IntoConnectParams>(params: T, ssl: &SslMode)
-> Result<PostgresConnection,
PostgresConnectError> {
InnerPostgresConnection::connect(params, ssl).map(|conn| {
PostgresConnection { conn: RefCell::new(conn) }
})
}
2013-09-30 02:12:20 +00:00
/// Sets the notice handler for the connection, returning the old handler.
2014-05-04 05:38:50 +00:00
pub fn set_notice_handler(&self, handler: Box<PostgresNoticeHandler:Send>)
-> Box<PostgresNoticeHandler:Send> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().set_notice_handler(handler)
}
/// Returns an iterator over asynchronous notification messages.
///
/// Use the `LISTEN` command to register this connection for notifications.
2014-02-03 05:09:48 +00:00
pub fn notifications<'a>(&'a self) -> PostgresNotifications<'a> {
PostgresNotifications {
conn: self
}
}
/// Creates a new prepared statement.
2013-09-30 02:12:20 +00:00
///
/// A statement may contain parameters, specified by `$n` where `n` is the
/// index of the parameter in the list provided at execution time,
/// 1-indexed.
///
/// The statement is associated with the connection that created it and may
/// not outlive that connection.
2014-03-09 22:22:20 +00:00
///
/// # Example
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-03-09 22:22:20 +00:00
/// # use postgres::{PostgresConnection, NoSsl};
/// # let conn = PostgresConnection::connect("", &NoSsl).unwrap();
/// let maybe_stmt = conn.prepare("SELECT foo FROM bar WHERE baz = $1");
2014-03-09 22:22:20 +00:00
/// let stmt = match maybe_stmt {
/// Ok(stmt) => stmt,
/// Err(err) => fail!("Error preparing statement: {}", err)
/// };
pub fn prepare<'a>(&'a self, query: &str)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresStatement<'a>> {
let mut conn = self.conn.borrow_mut();
if conn.trans_depth != 0 {
return Err(PgWrongTransaction);
}
conn.prepare(query, self)
}
/// Begins a new transaction.
2013-09-30 02:12:20 +00:00
///
2013-10-14 01:58:31 +00:00
/// Returns a `PostgresTransaction` object which should be used instead of
/// the connection for the duration of the transaction. The transaction
2013-10-15 02:36:01 +00:00
/// is active until the `PostgresTransaction` object falls out of scope.
2013-10-14 01:58:31 +00:00
/// A transaction will commit by default unless the task fails or the
/// transaction is set to roll back.
2014-03-09 22:22:20 +00:00
///
/// # Example
///
2014-03-10 17:06:40 +00:00
/// ```rust,no_run
2014-03-09 22:22:20 +00:00
/// # use postgres::{PostgresConnection, NoSsl};
/// # fn foo() -> Result<(), postgres::error::PostgresError> {
/// # let conn = PostgresConnection::connect("", &NoSsl).unwrap();
/// let trans = try!(conn.transaction());
/// try!(trans.execute("UPDATE foo SET bar = 10", []));
2014-03-09 22:22:20 +00:00
///
/// # let something_bad_happened = true;
/// if something_bad_happened {
/// trans.set_rollback();
/// }
///
/// drop(trans);
/// # Ok(())
/// # }
/// ```
pub fn transaction<'a>(&'a self)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresTransaction<'a>> {
check_desync!(self);
if self.conn.borrow().trans_depth != 0 {
return Err(PgWrongTransaction);
}
2014-02-22 07:18:39 +00:00
try!(self.quick_query("BEGIN"));
self.conn.borrow_mut().trans_depth += 1;
Ok(PostgresTransaction {
2013-08-27 05:40:23 +00:00
conn: self,
commit: Cell::new(true),
depth: 1,
finished: false,
})
}
/// A convenience function for queries that are only run once.
2013-09-30 02:12:20 +00:00
///
/// If an error is returned, it could have come from either the preparation
/// or execution of the statement.
///
/// On success, returns the number of rows modified or 0 if not applicable.
pub fn execute(&self, query: &str, params: &[&ToSql])
2014-04-03 04:26:41 +00:00
-> PostgresResult<uint> {
self.prepare(query).and_then(|stmt| stmt.execute(params))
2013-09-01 18:06:33 +00:00
}
2014-06-08 23:07:15 +00:00
/// Execute a sequence of SQL statements.
///
/// Statements should be separated by `;` characters. If an error occurs,
/// execution of the sequence will stop at that point. This is intended for
/// execution of batches of non-dynamic statements - for example, creation
/// of a schema for a fresh database.
///
/// # Warning
///
/// Prepared statements should be used for any SQL statement which contains
/// user-specified data, as it provides functionality to safely embed that
/// data in the statment. Do not form statements via string concatenation
/// and feed them into this method.
///
/// # Example
///
/// ```rust
/// # use postgres::{PostgresConnection, PostgresResult};
///
/// fn init_db(conn: &PostgresConnection) -> PostgresResult<()> {
/// static INIT_DB: &'static str = "
/// CREATE TABLE person (
/// id SERIAL PRIMARY KEY,
/// name NOT NULL
/// );
///
/// CREATE TABLE purchase (
/// id SERIAL PRIMARY KEY,
/// person INT NOT NULL REFERENCES person (id),
/// time TIMESTAMPTZ NOT NULL,
/// );
///
/// CREATE INDEX ON purchase (time);
/// ";
/// conn.batch_execute(INIT_DB)
/// }
/// ```
pub fn batch_execute(&self, query: &str) -> PostgresResult<()> {
let mut conn = self.conn.borrow_mut();
if conn.trans_depth != 0 {
return Err(PgWrongTransaction);
}
conn.quick_query(query).map(|_| ())
}
2013-10-21 00:32:14 +00:00
/// Returns information used to cancel pending queries.
///
/// Used with the `cancel_query` function. The object returned can be used
/// to cancel any query executed by the connection it was created from.
pub fn cancel_data(&self) -> PostgresCancelData {
2014-03-19 04:00:06 +00:00
self.conn.borrow().cancel_data
2013-10-21 00:32:14 +00:00
}
/// Returns whether or not the stream has been desynchronized due to an
/// error in the communication channel with the server.
///
/// If this has occurred, all further queries will immediately return an
/// error.
pub fn is_desynchronized(&self) -> bool {
2014-03-19 04:00:06 +00:00
self.conn.borrow().is_desynchronized()
}
/// Consumes the connection, closing it.
///
/// Functionally equivalent to the `Drop` implementation for
/// `PostgresConnection` except that it returns any error encountered to
/// the caller.
2014-04-03 04:26:41 +00:00
pub fn finish(self) -> PostgresResult<()> {
2014-03-19 04:00:06 +00:00
let mut conn = self.conn.borrow_mut();
conn.finished = true;
conn.finish_inner()
}
fn canary(&self) -> u32 {
2014-04-10 04:28:20 +00:00
self.conn.borrow().canary()
}
fn quick_query(&self, query: &str)
2014-05-26 03:38:40 +00:00
-> PostgresResult<Vec<Vec<Option<String>>>> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().quick_query(query)
2013-08-23 07:13:42 +00:00
}
2014-04-03 04:26:41 +00:00
fn wait_for_ready(&self) -> PostgresResult<()> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().wait_for_ready()
}
fn read_message(&self) -> IoResult<BackendMessage> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().read_message()
}
fn write_messages(&self, messages: &[FrontendMessage]) -> IoResult<()> {
2014-03-19 04:00:06 +00:00
self.conn.borrow_mut().write_messages(messages)
2013-08-17 22:09:26 +00:00
}
}
2013-11-10 04:58:38 +00:00
/// Specifies the SSL support requested for a new connection
pub enum SslMode {
2013-11-10 04:58:38 +00:00
/// The connection will not use SSL
NoSsl,
/// The connection will use SSL if the backend supports it
PreferSsl(SslContext),
2013-11-10 04:58:38 +00:00
/// The connection must use SSL
RequireSsl(SslContext)
2013-11-10 04:58:38 +00:00
}
2014-05-19 02:46:21 +00:00
/// Represents a transaction on a database connection.
///
/// The transaction will commit by default.
2013-11-24 22:42:44 +00:00
pub struct PostgresTransaction<'conn> {
conn: &'conn PostgresConnection,
commit: Cell<bool>,
depth: u32,
finished: bool,
}
#[unsafe_destructor]
2013-11-24 22:42:44 +00:00
impl<'conn> Drop for PostgresTransaction<'conn> {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
}
}
impl<'conn> PostgresTransaction<'conn> {
2014-04-03 04:26:41 +00:00
fn finish_inner(&mut self) -> PostgresResult<()> {
debug_assert!(self.depth == self.conn.conn.borrow().trans_depth);
2014-05-18 18:37:52 +00:00
let rollback = task::failing() || !self.commit.get();
let query = match (rollback, self.depth != 1) {
2014-05-18 18:37:52 +00:00
(true, true) => "ROLLBACK TO sp",
(true, false) => "ROLLBACK",
(false, true) => "RELEASE sp",
(false, false) => "COMMIT",
};
self.conn.conn.borrow_mut().trans_depth -= 1;
2014-05-18 18:37:52 +00:00
self.conn.quick_query(query).map(|_| ())
}
2013-08-27 05:40:23 +00:00
2013-09-30 02:12:20 +00:00
/// Like `PostgresConnection::prepare`.
pub fn prepare<'a>(&'a self, query: &str)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresStatement<'a>> {
if self.conn.conn.borrow().trans_depth != self.depth {
return Err(PgWrongTransaction);
}
self.conn.conn.borrow_mut().prepare(query, self.conn)
2013-09-01 18:06:33 +00:00
}
/// Like `PostgresConnection::execute`.
pub fn execute(&self, query: &str, params: &[&ToSql])
2014-04-03 04:26:41 +00:00
-> PostgresResult<uint> {
self.prepare(query).and_then(|s| s.execute(params))
2013-09-30 02:12:20 +00:00
}
2014-06-08 23:07:15 +00:00
/// Like `PostgresConnection::batch_execute`.
pub fn batch_execute(&self, query: &str) -> PostgresResult<()> {
if self.conn.conn.borrow().trans_depth != self.depth {
return Err(PgWrongTransaction);
}
self.conn.batch_execute(query)
}
/// Like `PostgresConnection::transaction`.
pub fn transaction<'a>(&'a self)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresTransaction<'a>> {
check_desync!(self.conn);
if self.conn.conn.borrow().trans_depth != self.depth {
return Err(PgWrongTransaction);
}
2014-02-22 07:18:39 +00:00
try!(self.conn.quick_query("SAVEPOINT sp"));
self.conn.conn.borrow_mut().trans_depth += 1;
Ok(PostgresTransaction {
2013-09-05 06:28:44 +00:00
conn: self.conn,
commit: Cell::new(true),
depth: self.depth + 1,
finished: false,
})
}
2013-09-30 02:12:20 +00:00
/// Determines if the transaction is currently set to commit or roll back.
2013-08-27 05:40:23 +00:00
pub fn will_commit(&self) -> bool {
self.commit.get()
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to commit at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_commit(&self) {
self.commit.set(true);
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to roll back at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_rollback(&self) {
self.commit.set(false);
2013-08-27 05:40:23 +00:00
}
/// Consumes the transaction, commiting or rolling it back as appropriate.
///
/// Functionally equivalent to the `Drop` implementation of
/// `PostgresTransaction` except that it returns any error to the caller.
2014-04-03 04:26:41 +00:00
pub fn finish(mut self) -> PostgresResult<()> {
self.finished = true;
self.finish_inner()
}
/// Executes a prepared statement, returning a lazily loaded iterator over
/// the resulting rows.
///
/// No more than `row_limit` rows will be stored in memory at a time. Rows
/// will be pulled from the database in batches of `row_limit` as needed.
/// If `row_limit` is less than or equal to 0, `lazy_query` is equivalent
/// to `query`.
pub fn lazy_query<'trans, 'stmt>(&'trans self,
stmt: &'stmt PostgresStatement,
params: &[&ToSql],
row_limit: i32)
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresLazyRows
<'trans, 'stmt>> {
2014-04-26 06:14:55 +00:00
if self.conn as *_ != stmt.conn as *_ {
return Err(PgWrongConnection);
}
check_desync!(self.conn);
stmt.lazy_query(row_limit, params).map(|result| {
PostgresLazyRows {
trans: self,
result: result
}
})
}
2013-08-27 05:40:23 +00:00
}
/// A prepared statement
pub struct PostgresStatement<'conn> {
conn: &'conn PostgresConnection,
2014-05-26 03:38:40 +00:00
name: String,
param_types: Vec<PostgresType>,
result_desc: Vec<ResultDescription>,
next_portal_id: Cell<uint>,
2014-04-26 06:14:55 +00:00
finished: bool,
}
#[unsafe_destructor]
impl<'conn> Drop for PostgresStatement<'conn> {
fn drop(&mut self) {
2014-04-26 06:14:55 +00:00
if !self.finished {
let _ = self.finish_inner();
}
}
}
impl<'conn> PostgresStatement<'conn> {
2014-04-03 04:26:41 +00:00
fn finish_inner(&mut self) -> PostgresResult<()> {
check_desync!(self.conn);
try_pg!(self.conn.write_messages([
Close {
variant: 'S' as u8,
name: self.name.as_slice()
},
Sync]));
loop {
match try_pg!(self.conn.read_message()) {
ReadyForQuery { .. } => break,
ErrorResponse { fields } => {
try!(self.conn.wait_for_ready());
return Err(PgDbError(PostgresDbError::new(fields)));
}
_ => {}
}
}
Ok(())
}
fn inner_execute(&self, portal_name: &str, row_limit: i32, params: &[&ToSql])
2014-04-03 04:26:41 +00:00
-> PostgresResult<()> {
2014-03-28 04:39:03 +00:00
if self.param_types.len() != params.len() {
return Err(PgWrongParamCount {
expected: self.param_types.len(),
actual: params.len(),
});
}
2014-05-26 18:41:18 +00:00
let mut formats = vec![];
let mut values = vec![];
for (&param, ty) in params.iter().zip(self.param_types.iter()) {
2014-03-28 05:43:10 +00:00
let (format, value) = try!(param.to_sql(ty));
formats.push(format as i16);
values.push(value);
};
let result_formats: Vec<i16> = self.result_desc.iter().map(|desc| {
desc.ty.result_format() as i16
}).collect();
try_pg!(self.conn.write_messages([
Bind {
portal: portal_name,
statement: self.name.as_slice(),
formats: formats.as_slice(),
values: values.as_slice(),
result_formats: result_formats.as_slice()
},
Execute {
portal: portal_name,
max_rows: row_limit
},
Sync]));
match try_pg!(self.conn.read_message()) {
BindComplete => Ok(()),
ErrorResponse { fields } => {
try!(self.conn.wait_for_ready());
Err(PgDbError(PostgresDbError::new(fields)))
}
_ => unreachable!()
}
}
fn lazy_query<'a>(&'a self, row_limit: i32, params: &[&ToSql])
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresRows<'a>> {
let id = self.next_portal_id.get();
self.next_portal_id.set(id + 1);
2014-05-24 03:08:14 +00:00
let portal_name = format!("{}p{}", self.name, id);
2014-05-16 02:59:01 +00:00
try!(self.inner_execute(portal_name.as_slice(), row_limit, params));
let mut result = PostgresRows {
stmt: self,
name: portal_name,
data: RingBuf::new(),
row_limit: row_limit,
more_rows: true,
finished: false,
};
try!(result.read_rows())
Ok(result)
}
/// Returns a slice containing the expected parameter types.
pub fn param_types<'a>(&'a self) -> &'a [PostgresType] {
self.param_types.as_slice()
}
/// Returns a slice describing the columns of the result of the query.
pub fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
self.result_desc.as_slice()
}
/// Executes the prepared statement, returning the number of rows modified.
///
/// If the statement does not modify any rows (e.g. SELECT), 0 is returned.
///
/// # Example
///
/// ```rust,no_run
/// # use postgres::{PostgresConnection, NoSsl};
/// # use postgres::types::ToSql;
/// # let conn = PostgresConnection::connect("", &NoSsl).unwrap();
/// # let bar = 1i32;
/// # let baz = true;
/// let stmt = conn.prepare("UPDATE foo SET bar = $1 WHERE baz = $2").unwrap();
/// match stmt.execute([&bar as &ToSql, &baz as &ToSql]) {
/// Ok(count) => println!("{} row(s) updated", count),
/// Err(err) => println!("Error executing query: {}", err)
/// }
2014-04-03 04:26:41 +00:00
pub fn execute(&self, params: &[&ToSql]) -> PostgresResult<uint> {
check_desync!(self.conn);
try!(self.inner_execute("", 0, params));
let num;
loop {
match try_pg!(self.conn.read_message()) {
DataRow { .. } => {}
ErrorResponse { fields } => {
try!(self.conn.wait_for_ready());
return Err(PgDbError(PostgresDbError::new(fields)));
}
CommandComplete { tag } => {
2014-05-13 05:05:37 +00:00
let s = tag.as_slice().split(' ').last().unwrap();
2014-05-12 06:28:54 +00:00
num = FromStr::from_str(s).unwrap_or(0);
break;
}
EmptyQueryResponse => {
num = 0;
break;
}
_ => unreachable!()
}
}
try!(self.conn.wait_for_ready());
Ok(num)
}
/// Executes the prepared statement, returning an iterator over the
/// resulting rows.
///
/// # Example
///
/// ```rust,no_run
/// # use postgres::{PostgresConnection, NoSsl};
/// # use postgres::types::ToSql;
/// # let conn = PostgresConnection::connect("", &NoSsl).unwrap();
/// let stmt = conn.prepare("SELECT foo FROM bar WHERE baz = $1").unwrap();
/// # let baz = true;
/// let mut rows = match stmt.query([&baz as &ToSql]) {
/// Ok(rows) => rows,
/// Err(err) => fail!("Error running query: {}", err)
/// };
/// for row in rows {
/// let foo: i32 = row["foo"];
/// println!("foo: {}", foo);
/// }
/// ```
pub fn query<'a>(&'a self, params: &[&ToSql])
2014-04-03 04:26:41 +00:00
-> PostgresResult<PostgresRows<'a>> {
check_desync!(self.conn);
self.lazy_query(0, params)
}
/// Consumes the statement, clearing it from the Postgres session.
///
/// Functionally identical to the `Drop` implementation of the
/// `PostgresStatement` except that it returns any error to the caller.
2014-04-03 04:26:41 +00:00
pub fn finish(mut self) -> PostgresResult<()> {
2014-04-26 06:14:55 +00:00
self.finished = true;
self.finish_inner()
}
}
/// Information about a column of the result of a query.
2014-06-02 02:57:27 +00:00
#[deriving(PartialEq, Eq)]
pub struct ResultDescription {
/// The name of the column
2014-05-26 03:38:40 +00:00
pub name: String,
/// The type of the data in the column
2014-04-20 21:38:11 +00:00
pub ty: PostgresType
}
/// An iterator over the resulting rows of a query.
pub struct PostgresRows<'stmt> {
stmt: &'stmt PostgresStatement<'stmt>,
2014-05-26 03:38:40 +00:00
name: String,
2014-04-08 03:02:05 +00:00
data: RingBuf<Vec<Option<Vec<u8>>>>,
row_limit: i32,
more_rows: bool,
finished: bool,
}
#[unsafe_destructor]
impl<'stmt> Drop for PostgresRows<'stmt> {
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
}
}
impl<'stmt> PostgresRows<'stmt> {
2014-04-03 04:26:41 +00:00
fn finish_inner(&mut self) -> PostgresResult<()> {
check_desync!(self.stmt.conn);
try_pg!(self.stmt.conn.write_messages([
Close {
variant: 'P' as u8,
name: self.name.as_slice()
},
Sync]));
loop {
match try_pg!(self.stmt.conn.read_message()) {
ReadyForQuery { .. } => break,
ErrorResponse { fields } => {
try!(self.stmt.conn.wait_for_ready());
return Err(PgDbError(PostgresDbError::new(fields)));
}
_ => {}
}
}
Ok(())
}
2014-04-03 04:26:41 +00:00
fn read_rows(&mut self) -> PostgresResult<()> {
loop {
match try_pg!(self.stmt.conn.read_message()) {
EmptyQueryResponse |
CommandComplete { .. } => {
self.more_rows = false;
break;
},
PortalSuspended => {
self.more_rows = true;
break;
},
DataRow { row } => self.data.push_back(row),
_ => unreachable!()
}
}
self.stmt.conn.wait_for_ready()
}
2014-04-03 04:26:41 +00:00
fn execute(&mut self) -> PostgresResult<()> {
try_pg!(self.stmt.conn.write_messages([
Execute {
2014-05-16 02:59:01 +00:00
portal: self.name.as_slice(),
max_rows: self.row_limit
},
Sync]));
self.read_rows()
}
/// Consumes the `PostgresRows`, cleaning up associated state.
///
/// Functionally identical to the `Drop` implementation on `PostgresRows`
/// except that it returns any error to the caller.
2014-04-03 04:26:41 +00:00
pub fn finish(mut self) -> PostgresResult<()> {
self.finished = true;
self.finish_inner()
}
2014-04-03 04:26:41 +00:00
fn try_next(&mut self) -> Option<PostgresResult<PostgresRow<'stmt>>> {
if self.data.is_empty() && self.more_rows {
2014-03-28 04:20:04 +00:00
match self.execute() {
Ok(()) => {}
Err(err) => return Some(Err(err))
}
}
2014-03-28 04:20:04 +00:00
self.data.pop_front().map(|row| {
Ok(PostgresRow {
stmt: self.stmt,
data: row
2014-03-28 04:20:04 +00:00
})
})
}
}
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresRows<'stmt> {
2014-04-03 05:56:16 +00:00
#[inline]
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
2014-03-28 04:20:04 +00:00
// we'll never hit the network on a non-lazy result
self.try_next().map(|r| r.unwrap())
}
2014-04-03 05:56:16 +00:00
#[inline]
fn size_hint(&self) -> (uint, Option<uint>) {
let lower = self.data.len();
let upper = if self.more_rows {
None
} else {
Some(lower)
};
(lower, upper)
}
}
/// A single result row of a query.
pub struct PostgresRow<'stmt> {
stmt: &'stmt PostgresStatement<'stmt>,
2014-04-08 03:02:05 +00:00
data: Vec<Option<Vec<u8>>>
}
impl<'stmt> PostgresRow<'stmt> {
/// Retrieves the contents of a field of the row.
///
/// A field can be accessed by the name or index of its column, though
/// access by index is more efficient. Rows are 1-indexed.
///
/// Returns an `Error` value if the index does not reference a column or
/// the return type is not compatible with the Postgres type.
2014-04-03 04:26:41 +00:00
pub fn get<I: RowIndex, T: FromSql>(&self, idx: I) -> PostgresResult<T> {
let idx = match idx.idx(self.stmt) {
Some(idx) => idx,
None => return Err(PgInvalidColumn)
};
FromSql::from_sql(&self.stmt.result_desc.get(idx).ty,
self.data.get(idx))
}
}
impl<'stmt> Container for PostgresRow<'stmt> {
#[inline]
fn len(&self) -> uint {
self.data.len()
}
}
impl<'stmt, I: RowIndex+Clone+fmt::Show, T: FromSql> Index<I, T>
for PostgresRow<'stmt> {
/// Retreives the contents of a field of the row.
///
/// A field can be accessed by the name or index of its column, though
/// access by index is more efficient. Rows are 1-indexed.
///
/// # Failure
///
/// Fails if the index does not reference a column or the return type is
/// not compatible with the Postgres type.
///
/// # Example
///
/// ```rust,no_run
/// # use postgres::{PostgresConnection, NoSsl};
/// # let conn = PostgresConnection::connect("", &NoSsl).unwrap();
/// # let stmt = conn.prepare("").unwrap();
/// # let mut result = stmt.query([]).unwrap();
/// # let row = result.next().unwrap();
/// let foo: i32 = row[1];
2014-05-26 03:38:40 +00:00
/// let bar: String = row["bar"];
/// ```
fn index(&self, idx: &I) -> T {
match self.get(idx.clone()) {
Ok(ok) => ok,
2014-04-02 16:19:38 +00:00
Err(err) => fail!("error retrieving column {}: {}", idx, err)
}
}
}
/// A trait implemented by types that can index into columns of a row.
2014-03-31 02:21:51 +00:00
pub trait RowIndex {
/// Returns the index of the appropriate column, or `None` if no such
/// column exists.
fn idx(&self, stmt: &PostgresStatement) -> Option<uint>;
}
impl RowIndex for uint {
#[inline]
fn idx(&self, stmt: &PostgresStatement) -> Option<uint> {
2014-03-30 23:19:04 +00:00
if *self == 0 || *self > stmt.result_desc.len() {
None
} else {
2014-03-30 23:19:04 +00:00
Some(*self - 1)
}
}
}
// This is a convenience as the 1 in get[1] resolves to int :(
impl RowIndex for int {
#[inline]
fn idx(&self, stmt: &PostgresStatement) -> Option<uint> {
if *self < 0 {
2014-05-26 22:21:15 +00:00
None
} else {
(*self as uint).idx(stmt)
}
}
}
impl<'a> RowIndex for &'a str {
2014-05-18 18:37:52 +00:00
#[inline]
fn idx(&self, stmt: &PostgresStatement) -> Option<uint> {
stmt.result_descriptions().iter()
.position(|d| d.name.as_slice() == *self)
}
}
/// A lazily-loaded iterator over the resulting rows of a query
pub struct PostgresLazyRows<'trans, 'stmt> {
result: PostgresRows<'stmt>,
trans: &'trans PostgresTransaction<'trans>,
}
impl<'trans, 'stmt> PostgresLazyRows<'trans, 'stmt> {
/// Like `PostgresRows::finish`.
2014-04-03 05:56:16 +00:00
#[inline]
2014-04-03 04:26:41 +00:00
pub fn finish(self) -> PostgresResult<()> {
self.result.finish()
}
}
2014-04-03 04:26:41 +00:00
impl<'trans, 'stmt> Iterator<PostgresResult<PostgresRow<'stmt>>>
for PostgresLazyRows<'trans, 'stmt> {
2014-04-03 05:56:16 +00:00
#[inline]
2014-04-03 04:26:41 +00:00
fn next(&mut self) -> Option<PostgresResult<PostgresRow<'stmt>>> {
2014-03-28 04:20:04 +00:00
self.result.try_next()
}
2014-04-03 05:56:16 +00:00
#[inline]
fn size_hint(&self) -> (uint, Option<uint>) {
self.result.size_hint()
}
}