rust-postgres/lib.rs

1224 lines
37 KiB
Rust
Raw Normal View History

2013-09-30 05:03:11 +00:00
/*!
Rust-Postgres is a pure-Rust frontend for the popular PostgreSQL database. It
exposes a high level interface in the vein of JDBC or Go's `database/sql`
package.
```rust
extern mod postgres = "github.com/sfackler/rust-postgres";
2013-10-01 07:01:54 +00:00
extern mod extra;
use extra::time;
use extra::time::Timespec;
2013-09-30 05:03:11 +00:00
2013-11-11 01:59:25 +00:00
use postgres::{PostgresConnection, PostgresStatement, NoSsl};
2013-09-30 05:03:11 +00:00
use postgres::types::ToSql;
struct Person {
id: i32,
name: ~str,
2013-10-01 07:01:54 +00:00
time_created: Timespec,
2013-09-30 05:03:11 +00:00
data: Option<~[u8]>
}
fn main() {
2013-11-11 01:59:25 +00:00
let conn = PostgresConnection::connect("postgres://postgres@localhost",
&NoSsl);
2013-09-30 05:03:11 +00:00
conn.update("CREATE TABLE person (
2013-10-01 07:01:54 +00:00
id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
time_created TIMESTAMP NOT NULL,
data BYTEA
2013-09-30 05:03:11 +00:00
)", []);
let me = Person {
id: 0,
name: ~"Steven",
2013-10-01 07:01:54 +00:00
time_created: time::get_time(),
2013-09-30 05:03:11 +00:00
data: None
};
2013-10-01 07:01:54 +00:00
conn.update("INSERT INTO person (name, time_created, data)
2013-09-30 05:03:11 +00:00
VALUES ($1, $2, $3)",
2013-10-01 07:01:54 +00:00
[&me.name as &ToSql, &me.time_created as &ToSql,
2013-09-30 05:03:11 +00:00
&me.data as &ToSql]);
2013-10-01 07:01:54 +00:00
let stmt = conn.prepare("SELECT id, name, time_created, data FROM person");
2013-09-30 05:03:11 +00:00
for row in stmt.query([]) {
let person = Person {
id: row[0],
name: row[1],
2013-10-01 07:01:54 +00:00
time_created: row[2],
2013-09-30 05:03:11 +00:00
data: row[3]
};
2013-10-01 07:01:54 +00:00
println!("Found person {}", person.name);
2013-09-30 05:03:11 +00:00
}
}
```
*/
2013-09-29 03:36:04 +00:00
#[desc="A native PostgreSQL driver"];
#[license="MIT"];
2013-09-03 02:08:37 +00:00
2013-09-30 02:12:20 +00:00
// Needed for rustdoc-ng
#[link(name="rust-postgres", vers="0.1",
package_id="github.com/sfackler/rust-postgres")];
2013-09-30 02:12:20 +00:00
#[doc(html_root_url="http://sfackler.com/doc/rust-postgres/")];
2013-10-04 06:21:27 +00:00
2013-09-30 02:47:30 +00:00
#[warn(missing_doc)];
2013-10-29 05:35:52 +00:00
#[feature(macro_rules, struct_variant, globs)];
2013-10-08 04:11:54 +00:00
2013-08-22 05:52:15 +00:00
extern mod extra;
2013-11-10 03:01:06 +00:00
extern mod ssl = "github.com/sfackler/rust-ssl";
2013-08-04 02:17:32 +00:00
use extra::container::Deque;
use extra::ringbuf::RingBuf;
use extra::url::{UserInfo, Url};
2013-11-10 04:58:38 +00:00
use ssl::{SslStream, SslContext};
2013-11-24 21:12:46 +00:00
use std::cell::RefCell;
2013-11-13 05:33:52 +00:00
use std::io::io_error;
use std::io::buffered::BufferedStream;
use std::io::net;
use std::io::net::ip::{Port, SocketAddr};
use std::io::net::tcp::TcpStream;
use std::task;
2013-08-04 02:17:32 +00:00
2013-11-11 01:59:25 +00:00
use self::error::{PostgresDbError,
PostgresConnectError,
2013-11-11 01:41:15 +00:00
InvalidUrl,
DnsError,
SocketError,
NoSslSupport,
SslError,
MissingUser,
DbError,
UnsupportedAuthentication,
MissingPassword};
use self::message::{BackendMessage,
AuthenticationOk,
AuthenticationKerberosV5,
AuthenticationCleartextPassword,
AuthenticationMD5Password,
AuthenticationSCMCredential,
AuthenticationGSS,
AuthenticationSSPI,
BackendKeyData,
BindComplete,
CommandComplete,
DataRow,
EmptyQueryResponse,
ErrorResponse,
NoData,
NoticeResponse,
NotificationResponse,
ParameterDescription,
ParameterStatus,
ParseComplete,
PortalSuspended,
ReadyForQuery,
RowDescription};
use self::message::{FrontendMessage,
Bind,
CancelRequest,
Close,
Describe,
Execute,
Parse,
PasswordMessage,
Query,
StartupMessage,
2013-11-10 04:58:38 +00:00
SslRequest,
Sync,
Terminate};
use self::message::{RowDescriptionEntry, WriteMessage, ReadMessage};
use self::types::{PostgresType, ToSql, FromSql};
use self::util::digest::Digest;
use self::util::md5::Md5;
2013-07-25 07:10:18 +00:00
2013-09-17 05:48:34 +00:00
pub mod error;
pub mod pool;
2013-08-22 05:52:15 +00:00
mod message;
2013-09-17 05:48:34 +00:00
pub mod types;
mod util;
2013-07-25 07:10:18 +00:00
2013-10-21 00:32:14 +00:00
static DEFAULT_PORT: Port = 5432;
2013-09-30 02:12:20 +00:00
/// Trait for types that can handle Postgres notice messages
pub trait PostgresNoticeHandler {
2013-09-30 02:12:20 +00:00
/// Handle a Postgres notice message
fn handle(&mut self, notice: PostgresDbError);
}
2013-09-30 05:22:10 +00:00
/// A notice handler which logs at the `info` level.
2013-09-30 02:12:20 +00:00
///
/// This is the default handler used by a `PostgresConnection`.
pub struct DefaultNoticeHandler;
impl PostgresNoticeHandler for DefaultNoticeHandler {
fn handle(&mut self, notice: PostgresDbError) {
2013-10-21 04:06:12 +00:00
info!("{}: {}", notice.severity, notice.message);
}
}
/// An asynchronous notification
pub struct PostgresNotification {
/// The process ID of the notifying backend process
pid: i32,
/// The name of the channel that the notify has been raised on
channel: ~str,
/// The "payload" string passed from the notifying process
payload: ~str,
}
/// An iterator over asynchronous notifications
2013-11-24 22:42:44 +00:00
pub struct PostgresNotificationIterator<'conn> {
priv conn: &'conn PostgresConnection
}
2013-11-24 22:42:44 +00:00
impl<'conn > Iterator<PostgresNotification> for
PostgresNotificationIterator<'conn> {
/// Returns the oldest pending notification or `None` if there are none.
///
/// # Note
///
/// `next` may return `Some` notification after returning `None` if a new
/// notification was received.
fn next(&mut self) -> Option<PostgresNotification> {
self.conn.conn.with_mut(|conn| { conn.notifications.pop_front() })
}
}
2013-10-21 00:32:14 +00:00
/// Contains information necessary to cancel queries for a session
pub struct PostgresCancelData {
/// The process ID of the session
process_id: i32,
/// The secret key for the session
secret_key: i32,
}
/// Attempts to cancel an in-progress query.
///
/// The backend provides no information about whether a cancellation attempt
/// was successful or not. An error will only be returned if the driver was
/// unable to connect to the database.
///
/// A `PostgresCancelData` object can be created via
/// `PostgresConnection::cancel_data`. The object can cancel any query made on
/// that connection.
pub fn cancel_query(url: &str, ssl: &SslMode, data: PostgresCancelData)
2013-10-21 00:32:14 +00:00
-> Option<PostgresConnectError> {
let Url { host, port, .. }: Url = match FromStr::from_str(url) {
2013-10-21 00:32:14 +00:00
Some(url) => url,
None => return Some(InvalidUrl)
};
let port = match port {
Some(port) => FromStr::from_str(port).unwrap(),
None => DEFAULT_PORT
};
2013-11-10 04:58:38 +00:00
let mut socket = match initialize_stream(host, port, ssl) {
2013-10-21 00:32:14 +00:00
Ok(socket) => socket,
Err(err) => return Some(err)
};
socket.write_message(&CancelRequest {
code: message::CANCEL_CODE,
process_id: data.process_id,
secret_key: data.secret_key
});
2013-11-10 04:58:38 +00:00
socket.flush();
2013-10-21 00:32:14 +00:00
None
}
fn open_socket(host: &str, port: Port)
-> Result<TcpStream, PostgresConnectError> {
let addrs = io_error::cond.trap(|_| {}).inside(|| {
2013-10-21 00:32:14 +00:00
net::get_host_addresses(host)
});
2013-10-21 00:32:14 +00:00
let addrs = match addrs {
Some(addrs) => addrs,
None => return Err(DnsError)
};
for addr in addrs.iter() {
let socket = io_error::cond.trap(|_| {}).inside(|| {
2013-10-21 00:32:14 +00:00
TcpStream::connect(SocketAddr { ip: *addr, port: port })
});
2013-10-21 00:32:14 +00:00
match socket {
Some(socket) => return Ok(socket),
2013-10-25 05:30:34 +00:00
None => {}
2013-10-21 00:32:14 +00:00
}
}
Err(SocketError)
}
fn initialize_stream(host: &str, port: Port, ssl: &SslMode)
2013-11-10 04:58:38 +00:00
-> Result<InternalStream, PostgresConnectError> {
let mut socket = match open_socket(host, port) {
Ok(socket) => socket,
Err(err) => return Err(err)
};
let (ssl_required, ctx) = match ssl {
&NoSsl => return Ok(Normal(socket)),
&PreferSsl(ref ctx) => (false, ctx),
&RequireSsl(ref ctx) => (true, ctx)
2013-11-10 04:58:38 +00:00
};
socket.write_message(&SslRequest { code: message::SSL_CODE });
socket.flush();
2013-11-12 07:29:53 +00:00
if socket.read_u8() == 'N' as u8 {
if ssl_required {
return Err(NoSslSupport);
} else {
return Ok(Normal(socket));
}
2013-11-10 04:58:38 +00:00
}
match SslStream::try_new(ctx, socket) {
Ok(stream) => Ok(Ssl(stream)),
Err(err) => Err(SslError(err))
}
}
2013-11-10 03:01:06 +00:00
enum InternalStream {
Normal(TcpStream),
Ssl(SslStream<TcpStream>)
}
impl Reader for InternalStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match *self {
Normal(ref mut s) => s.read(buf),
Ssl(ref mut s) => s.read(buf)
}
}
fn eof(&mut self) -> bool {
match *self {
Normal(ref mut s) => s.eof(),
Ssl(ref mut s) => s.eof()
}
}
}
impl Writer for InternalStream {
fn write(&mut self, buf: &[u8]) {
match *self {
Normal(ref mut s) => s.write(buf),
Ssl(ref mut s) => s.write(buf)
}
}
}
struct InnerPostgresConnection {
2013-11-10 03:01:06 +00:00
stream: BufferedStream<InternalStream>,
next_stmt_id: int,
notice_handler: ~PostgresNoticeHandler,
notifications: RingBuf<PostgresNotification>,
2013-10-21 00:32:14 +00:00
cancel_data: PostgresCancelData
2013-08-29 06:19:53 +00:00
}
impl Drop for InnerPostgresConnection {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
io_error::cond.trap(|_| {}).inside(|| {
2013-11-10 04:58:38 +00:00
self.write_messages([Terminate]);
})
2013-08-29 06:19:53 +00:00
}
}
impl InnerPostgresConnection {
fn try_connect(url: &str, ssl: &SslMode)
2013-11-10 04:58:38 +00:00
-> Result<InnerPostgresConnection, PostgresConnectError> {
let Url {
host,
port,
user,
path,
query: args,
..
}: Url = match FromStr::from_str(url) {
Some(url) => url,
None => return Err(InvalidUrl)
};
let user = match user {
Some(user) => user,
None => return Err(MissingUser)
};
let mut args = args;
2013-09-08 21:26:34 +00:00
let port = match port {
Some(port) => FromStr::from_str(port).unwrap(),
2013-10-21 00:32:14 +00:00
None => DEFAULT_PORT
2013-09-08 21:26:34 +00:00
};
2013-11-10 04:58:38 +00:00
let stream = match initialize_stream(host, port, ssl) {
2013-09-08 21:26:34 +00:00
Ok(stream) => stream,
Err(err) => return Err(err)
};
2013-08-22 05:52:15 +00:00
let mut conn = InnerPostgresConnection {
2013-11-10 04:58:38 +00:00
stream: BufferedStream::new(stream),
next_stmt_id: 0,
notice_handler: ~DefaultNoticeHandler as ~PostgresNoticeHandler,
notifications: RingBuf::new(),
2013-10-21 00:32:14 +00:00
cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 }
2013-08-18 03:42:40 +00:00
};
args.push((~"client_encoding", ~"UTF8"));
2013-09-09 04:33:41 +00:00
// Postgres uses the value of TimeZone as the time zone for TIMESTAMP
// WITH TIME ZONE values. Timespec converts to GMT internally.
args.push((~"TimeZone", ~"GMT"));
// We have to clone here since we need the user again for auth
args.push((~"user", user.user.clone()));
if !path.is_empty() {
2013-10-05 03:26:52 +00:00
// path contains the leading /
args.push((~"database", path.slice_from(1).to_owned()));
2013-08-04 02:17:32 +00:00
}
2013-11-10 04:58:38 +00:00
conn.write_messages([StartupMessage {
2013-09-09 05:40:08 +00:00
version: message::PROTOCOL_VERSION,
parameters: args.as_slice()
2013-09-14 20:03:38 +00:00
}]);
match conn.handle_auth(user) {
Some(err) => return Err(err),
2013-10-25 05:30:34 +00:00
None => {}
}
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
loop {
match conn.read_message() {
2013-10-21 00:32:14 +00:00
BackendKeyData { process_id, secret_key } => {
conn.cancel_data.process_id = process_id;
conn.cancel_data.secret_key = secret_key;
}
ReadyForQuery { .. } => break,
2013-10-05 03:26:52 +00:00
ErrorResponse { fields } =>
return Err(DbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-05 00:48:48 +00:00
}
2013-08-23 05:24:14 +00:00
Ok(conn)
2013-08-05 00:48:48 +00:00
}
2013-11-10 04:58:38 +00:00
fn write_messages(&mut self, messages: &[FrontendMessage]) {
for message in messages.iter() {
self.stream.write_message(message);
2013-07-25 07:10:18 +00:00
}
self.stream.flush();
2013-08-22 06:41:26 +00:00
}
2013-08-04 02:17:32 +00:00
fn read_message(&mut self) -> BackendMessage {
loop {
match self.stream.read_message() {
NoticeResponse { fields } =>
self.notice_handler.handle(PostgresDbError::new(fields)),
NotificationResponse { pid, channel, payload } =>
self.notifications.push_back(PostgresNotification {
pid: pid,
channel: channel,
payload: payload
}),
ParameterStatus { parameter, value } =>
2013-11-04 04:49:17 +00:00
info!("Parameter {} = {}", parameter, value),
msg => return msg
}
2013-08-04 05:21:16 +00:00
}
2013-08-22 06:41:26 +00:00
}
fn handle_auth(&mut self, user: UserInfo) -> Option<PostgresConnectError> {
match self.read_message() {
AuthenticationOk => return None,
AuthenticationCleartextPassword => {
let pass = match user.pass {
Some(pass) => pass,
None => return Some(MissingPassword)
};
2013-11-10 04:58:38 +00:00
self.write_messages([PasswordMessage { password: pass }]);
}
AuthenticationMD5Password { salt } => {
let UserInfo { user, pass } = user;
let pass = match pass {
Some(pass) => pass,
None => return Some(MissingPassword)
};
let input = pass + user;
let mut md5 = Md5::new();
md5.input_str(input);
let output = md5.result_str();
md5.reset();
md5.input_str(output);
md5.input(salt);
let output = "md5" + md5.result_str();
2013-11-10 04:58:38 +00:00
self.write_messages([PasswordMessage {
password: output.as_slice()
}]);
}
AuthenticationKerberosV5
2013-09-12 05:02:32 +00:00
| AuthenticationSCMCredential
| AuthenticationGSS
| AuthenticationSSPI => return Some(UnsupportedAuthentication),
2013-10-05 02:26:57 +00:00
ErrorResponse { fields } =>
return Some(DbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
match self.read_message() {
AuthenticationOk => None,
ErrorResponse { fields } =>
Some(DbError(PostgresDbError::new(fields))),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
}
fn set_notice_handler(&mut self, handler: ~PostgresNoticeHandler)
-> ~PostgresNoticeHandler {
::std::util::replace(&mut self.notice_handler, handler)
}
2013-09-30 02:12:20 +00:00
fn try_prepare<'a>(&mut self, query: &str, conn: &'a PostgresConnection)
2013-09-16 02:26:56 +00:00
-> Result<NormalPostgresStatement<'a>, PostgresDbError> {
let stmt_name = format!("statement_{}", self.next_stmt_id);
self.next_stmt_id += 1;
2013-08-05 00:48:48 +00:00
2013-08-22 06:41:26 +00:00
let types = [];
self.write_messages([
2013-11-10 04:58:38 +00:00
Parse {
2013-09-02 17:27:09 +00:00
name: stmt_name,
query: query,
param_types: types
},
2013-11-10 04:58:38 +00:00
Describe {
2013-09-02 17:27:09 +00:00
variant: 'S' as u8,
name: stmt_name
},
2013-11-10 04:58:38 +00:00
Sync]);
2013-08-22 06:41:26 +00:00
match self.read_message() {
2013-10-25 05:30:34 +00:00
ParseComplete => {}
ErrorResponse { fields } => {
self.wait_for_ready();
return Err(PostgresDbError::new(fields));
}
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-22 06:41:26 +00:00
let param_types = match self.read_message() {
ParameterDescription { types } =>
types.iter().map(|ty| { PostgresType::from_oid(*ty) })
.collect(),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
};
2013-08-22 06:41:26 +00:00
let result_desc = match self.read_message() {
RowDescription { descriptions } => {
let mut res: ~[ResultDescription] = descriptions
.move_rev_iter().map(|desc| {
ResultDescription::from_row_description_entry(desc)
}).collect();
res.reverse();
res
},
NoData => ~[],
2013-10-25 05:30:34 +00:00
_ => unreachable!()
};
2013-08-17 22:09:26 +00:00
2013-08-22 06:41:26 +00:00
self.wait_for_ready();
Ok(NormalPostgresStatement {
conn: conn,
2013-08-22 06:41:26 +00:00
name: stmt_name,
param_types: param_types,
result_desc: result_desc,
2013-11-24 21:12:46 +00:00
next_portal_id: RefCell::new(0)
})
2013-08-22 06:41:26 +00:00
}
fn wait_for_ready(&mut self) {
match self.read_message() {
ReadyForQuery { .. } => {}
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
}
}
2013-09-30 02:12:20 +00:00
/// A connection to a Postgres database.
pub struct PostgresConnection {
2013-11-24 21:12:46 +00:00
priv conn: RefCell<InnerPostgresConnection>
}
impl PostgresConnection {
2013-09-30 02:12:20 +00:00
/// Attempts to create a new connection to a Postgres database.
///
/// The URL should be provided in the normal format:
///
/// ```
/// postgres://user[:password]@host[:port][/database][?param1=val1[[&param2=val2]...]]
/// ```
///
/// The password may be omitted if not required. The default Postgres port
/// (5432) is used if none is specified. The database name defaults to the
/// username if not specified.
pub fn try_connect(url: &str, ssl: &SslMode)
2013-11-10 04:58:38 +00:00
-> Result<PostgresConnection, PostgresConnectError> {
InnerPostgresConnection::try_connect(url, ssl).map(|conn| {
PostgresConnection {
2013-11-24 21:12:46 +00:00
conn: RefCell::new(conn)
}
})
}
2013-09-30 02:12:20 +00:00
/// A convenience wrapper around `try_connect`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error connecting to the database.
pub fn connect(url: &str, ssl: &SslMode) -> PostgresConnection {
2013-11-10 04:58:38 +00:00
match PostgresConnection::try_connect(url, ssl) {
2013-09-30 02:12:20 +00:00
Ok(conn) => conn,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Failed to connect: {}", err.to_str())
2013-09-30 02:12:20 +00:00
}
}
/// Sets the notice handler for the connection, returning the old handler.
pub fn set_notice_handler(&self, handler: ~PostgresNoticeHandler)
-> ~PostgresNoticeHandler {
2013-11-24 21:12:46 +00:00
let mut conn = self.conn.borrow_mut();
conn.get().set_notice_handler(handler)
}
/// Returns an iterator over asynchronous notification messages.
///
/// Use the `LISTEN` command to register this connection for notifications.
pub fn notifications<'a>(&'a self) -> PostgresNotificationIterator<'a> {
PostgresNotificationIterator {
conn: self
}
}
2013-09-30 02:12:20 +00:00
/// Attempts to create a new prepared statement.
///
/// A statement may contain parameters, specified by `$n` where `n` is the
/// index of the parameter in the list provided at execution time,
/// 1-indexed.
///
/// The statement is associated with the connection that created it and may
/// not outlive that connection.
pub fn try_prepare<'a>(&'a self, query: &str)
-> Result<NormalPostgresStatement<'a>, PostgresDbError> {
self.conn.with_mut(|conn| conn.try_prepare(query, self))
2013-09-30 02:12:20 +00:00
}
/// A convenience wrapper around `try_prepare`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error preparing the statement.
pub fn prepare<'a>(&'a self, query: &str) -> NormalPostgresStatement<'a> {
match self.try_prepare(query) {
Ok(stmt) => stmt,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Error preparing statement:\n{}",
err.pretty_error(query))
}
}
2013-10-14 01:58:31 +00:00
/// Begins a new transaction.
2013-09-30 02:12:20 +00:00
///
2013-10-14 01:58:31 +00:00
/// Returns a `PostgresTransaction` object which should be used instead of
/// the connection for the duration of the transaction. The transaction
2013-10-15 02:36:01 +00:00
/// is active until the `PostgresTransaction` object falls out of scope.
2013-10-14 01:58:31 +00:00
/// A transaction will commit by default unless the task fails or the
/// transaction is set to roll back.
pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> {
2013-08-23 07:13:42 +00:00
self.quick_query("BEGIN");
2013-10-14 01:58:31 +00:00
PostgresTransaction {
2013-08-27 05:40:23 +00:00
conn: self,
2013-11-24 21:12:46 +00:00
commit: RefCell::new(true),
nested: false
2013-10-14 01:58:31 +00:00
}
2013-08-23 05:24:14 +00:00
}
2013-09-30 02:12:20 +00:00
/// A convenience function for update queries that are only run once.
///
/// If an error is returned, it could have come from either the preparation
/// or execution of the statement.
///
/// On success, returns the number of rows modified or 0 if not applicable.
pub fn try_update(&self, query: &str, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
self.try_prepare(query).and_then(|stmt| stmt.try_update(params))
2013-09-30 02:12:20 +00:00
}
/// A convenience wrapper around `try_update`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error preparing or executing the statement.
2013-09-01 18:06:33 +00:00
pub fn update(&self, query: &str, params: &[&ToSql]) -> uint {
match self.try_update(query, params) {
Ok(res) => res,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Error running update:\n{}",
2013-09-15 05:56:52 +00:00
err.pretty_error(query))
2013-09-01 18:06:33 +00:00
}
}
2013-10-21 00:32:14 +00:00
/// Returns information used to cancel pending queries.
///
/// Used with the `cancel_query` function. The object returned can be used
/// to cancel any query executed by the connection it was created from.
pub fn cancel_data(&self) -> PostgresCancelData {
self.conn.with(|conn| conn.cancel_data)
2013-10-21 00:32:14 +00:00
}
2013-08-23 07:13:42 +00:00
fn quick_query(&self, query: &str) {
self.conn.with_mut(|conn| {
2013-11-10 04:58:38 +00:00
conn.write_messages([Query { query: query }]);
loop {
match conn.read_message() {
ReadyForQuery { .. } => break,
ErrorResponse { fields } =>
2013-10-21 04:06:12 +00:00
fail!("Error: {}",
PostgresDbError::new(fields).to_str()),
2013-10-25 05:30:34 +00:00
_ => {}
}
}
})
2013-08-23 07:13:42 +00:00
}
2013-08-22 06:41:26 +00:00
fn wait_for_ready(&self) {
self.conn.with_mut(|conn| conn.wait_for_ready())
}
fn read_message(&self) -> BackendMessage {
self.conn.with_mut(|conn| conn.read_message())
}
2013-11-10 04:58:38 +00:00
fn write_messages(&self, messages: &[FrontendMessage]) {
self.conn.with_mut(|conn| conn.write_messages(messages))
2013-08-17 22:09:26 +00:00
}
}
2013-11-10 04:58:38 +00:00
/// Specifies the SSL support requested for a new connection
pub enum SslMode {
2013-11-10 04:58:38 +00:00
/// The connection will not use SSL
NoSsl,
/// The connection will use SSL if the backend supports it
PreferSsl(SslContext),
2013-11-10 04:58:38 +00:00
/// The connection must use SSL
RequireSsl(SslContext)
2013-11-10 04:58:38 +00:00
}
2013-09-30 02:12:20 +00:00
/// Represents a transaction on a database connection
2013-11-24 22:42:44 +00:00
pub struct PostgresTransaction<'conn> {
priv conn: &'conn PostgresConnection,
2013-11-24 21:12:46 +00:00
priv commit: RefCell<bool>,
priv nested: bool
}
#[unsafe_destructor]
2013-11-24 22:42:44 +00:00
impl<'conn> Drop for PostgresTransaction<'conn> {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
io_error::cond.trap(|_| {}).inside(|| {
2013-11-24 21:12:46 +00:00
if task::failing() || !self.commit.with(|x| *x) {
if self.nested {
self.conn.quick_query("ROLLBACK TO sp");
} else {
self.conn.quick_query("ROLLBACK");
}
} else {
if self.nested {
self.conn.quick_query("RELEASE sp");
} else {
self.conn.quick_query("COMMIT");
}
}
})
}
2013-08-27 05:40:23 +00:00
}
2013-11-24 22:42:44 +00:00
impl<'conn> PostgresTransaction<'conn> {
2013-09-30 02:12:20 +00:00
/// Like `PostgresConnection::try_prepare`.
2013-08-27 05:40:23 +00:00
pub fn try_prepare<'a>(&'a self, query: &str)
-> Result<TransactionalPostgresStatement<'a>, PostgresDbError> {
self.conn.try_prepare(query).map(|stmt| {
TransactionalPostgresStatement {
stmt: stmt
}
})
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Like `PostgresConnection::prepare`.
pub fn prepare<'a>(&'a self, query: &str)
-> TransactionalPostgresStatement<'a> {
TransactionalPostgresStatement {
stmt: self.conn.prepare(query)
}
2013-09-01 18:06:33 +00:00
}
2013-09-30 02:12:20 +00:00
/// Like `PostgresConnection::try_update`.
2013-09-01 18:06:33 +00:00
pub fn try_update(&self, query: &str, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
self.conn.try_update(query, params)
}
2013-09-30 02:12:20 +00:00
/// Like `PostgresConnection::update`.
pub fn update(&self, query: &str, params: &[&ToSql]) -> uint {
self.conn.update(query, params)
}
2013-10-21 00:54:50 +00:00
/// Like `PostgresConnection::transaction`.
2013-11-24 22:42:44 +00:00
pub fn transaction<'a>(&self) -> PostgresTransaction<'conn> {
self.conn.quick_query("SAVEPOINT sp");
2013-10-14 01:58:31 +00:00
PostgresTransaction {
2013-09-05 06:28:44 +00:00
conn: self.conn,
2013-11-24 21:12:46 +00:00
commit: RefCell::new(true),
nested: true
2013-10-14 01:58:31 +00:00
}
2013-09-05 06:28:44 +00:00
}
/// Like `PostgresConnection::notifications`.
pub fn notifications<'a>(&'a self) -> PostgresNotificationIterator<'a> {
self.conn.notifications()
}
2013-09-30 02:12:20 +00:00
/// Determines if the transaction is currently set to commit or roll back.
2013-08-27 05:40:23 +00:00
pub fn will_commit(&self) -> bool {
2013-11-24 21:12:46 +00:00
self.commit.with(|x| *x)
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to commit at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_commit(&self) {
2013-11-24 21:12:46 +00:00
self.commit.with_mut(|x| *x = true);
2013-08-27 05:40:23 +00:00
}
2013-09-30 02:12:20 +00:00
/// Sets the transaction to roll back at its completion.
2013-08-27 05:40:23 +00:00
pub fn set_rollback(&self) {
2013-11-24 21:12:46 +00:00
self.commit.with_mut(|x| *x = false);
2013-08-27 05:40:23 +00:00
}
}
2013-09-30 02:12:20 +00:00
/// A trait containing methods that can be called on a prepared statement.
pub trait PostgresStatement {
2013-09-30 02:12:20 +00:00
/// Returns a slice containing the expected parameter types.
fn param_types<'a>(&'a self) -> &'a [PostgresType];
2013-09-30 02:12:20 +00:00
/// Returns a slice describing the columns of the result of the query.
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription];
2013-09-30 02:12:20 +00:00
/// Attempts to execute the prepared statement, returning the number of
/// rows modified.
///
/// If the statement does not modify any rows (e.g. SELECT), 0 is returned.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if the number or types of the provided parameters do not match
/// the parameters of the statement.
fn try_update(&self, params: &[&ToSql]) -> Result<uint, PostgresDbError>;
2013-09-30 02:12:20 +00:00
2013-09-30 05:22:10 +00:00
/// A convenience function wrapping `try_update`.
2013-09-30 02:12:20 +00:00
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error executing the statement.
fn update(&self, params: &[&ToSql]) -> uint {
match self.try_update(params) {
Ok(count) => count,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Error running update\n{}", err.to_str())
}
}
2013-09-30 02:12:20 +00:00
/// Attempts to execute the prepared statement, returning an iterator over
/// the resulting rows.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if the number or types of the provided parameters do not match
/// the parameters of the statement.
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError>;
2013-09-30 02:12:20 +00:00
/// A convenience function wrapping `try_query`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error executing the statement.
fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> {
match self.try_query(params) {
Ok(result) => result,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Error executing query:\n{}", err.to_str())
}
}
}
2013-09-30 02:12:20 +00:00
/// A statement prepared outside of a transaction.
2013-11-24 22:42:44 +00:00
pub struct NormalPostgresStatement<'conn> {
priv conn: &'conn PostgresConnection,
2013-08-22 06:41:26 +00:00
priv name: ~str,
priv param_types: ~[PostgresType],
priv result_desc: ~[ResultDescription],
2013-11-24 21:12:46 +00:00
priv next_portal_id: RefCell<uint>
2013-08-22 07:12:35 +00:00
}
#[unsafe_destructor]
2013-11-24 22:42:44 +00:00
impl<'conn> Drop for NormalPostgresStatement<'conn> {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
io_error::cond.trap(|_| {}).inside(|| {
self.conn.write_messages([
2013-11-10 04:58:38 +00:00
Close {
2013-09-01 05:48:49 +00:00
variant: 'S' as u8,
name: self.name.as_slice()
},
2013-11-10 04:58:38 +00:00
Sync]);
loop {
match self.conn.read_message() {
ReadyForQuery { .. } => break,
2013-10-25 05:30:34 +00:00
_ => {}
}
2013-08-23 05:24:14 +00:00
}
})
2013-08-22 07:12:35 +00:00
}
}
2013-11-24 22:42:44 +00:00
impl<'conn> NormalPostgresStatement<'conn> {
fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
2013-09-01 05:48:49 +00:00
-> Option<PostgresDbError> {
let mut formats = ~[];
let mut values = ~[];
2013-09-12 05:33:19 +00:00
assert!(self.param_types.len() == params.len(),
2013-10-02 02:55:17 +00:00
"Expected {} parameters but found {}",
2013-09-12 05:33:19 +00:00
self.param_types.len(), params.len());
for (&param, &ty) in params.iter().zip(self.param_types.iter()) {
let (format, value) = param.to_sql(ty);
formats.push(format as i16);
values.push(value);
};
let result_formats: ~[i16] = self.result_desc.iter().map(|desc| {
desc.ty.result_format() as i16
}).collect();
2013-08-22 07:12:35 +00:00
self.conn.write_messages([
2013-11-10 04:58:38 +00:00
Bind {
2013-09-01 05:48:49 +00:00
portal: portal_name,
statement: self.name.as_slice(),
formats: formats,
values: values,
result_formats: result_formats
},
2013-11-10 04:58:38 +00:00
Execute {
portal: portal_name,
max_rows: row_limit as i32
},
2013-11-10 04:58:38 +00:00
Sync]);
2013-08-22 07:12:35 +00:00
match self.conn.read_message() {
BindComplete => None,
ErrorResponse { fields } => {
self.conn.wait_for_ready();
Some(PostgresDbError::new(fields))
}
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-23 05:24:14 +00:00
}
fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
2013-11-24 21:12:46 +00:00
let id = self.next_portal_id.with_mut(|x| { *x += 1; *x - 1 });
let portal_name = format!("{}_portal_{}", self.name, id);
match self.execute(portal_name, row_limit, params) {
Some(err) => {
return Err(err);
}
2013-10-25 05:30:34 +00:00
None => {}
}
let mut result = PostgresResult {
stmt: self,
name: portal_name,
data: RingBuf::new(),
row_limit: row_limit,
more_rows: true
};
result.read_rows();
Ok(result)
}
}
2013-08-23 05:24:14 +00:00
2013-11-24 22:42:44 +00:00
impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
self.param_types.as_slice()
}
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
self.result_desc.as_slice()
}
fn try_update(&self, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
match self.execute("", 0, params) {
Some(err) => {
return Err(err);
}
2013-10-25 05:30:34 +00:00
None => {}
}
2013-08-22 07:12:35 +00:00
2013-08-29 05:44:34 +00:00
let num;
2013-08-23 05:24:14 +00:00
loop {
match self.conn.read_message() {
DataRow { .. } => {}
2013-10-25 05:30:34 +00:00
ErrorResponse { fields } => {
self.conn.wait_for_ready();
return Err(PostgresDbError::new(fields));
}
CommandComplete { tag } => {
let s = tag.split(' ').last().unwrap();
2013-08-29 05:44:34 +00:00
num = match FromStr::from_str(s) {
None => 0,
Some(n) => n
};
2013-08-23 05:24:14 +00:00
break;
}
2013-08-29 05:44:34 +00:00
EmptyQueryResponse => {
num = 0;
break;
}
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
2013-08-23 05:24:14 +00:00
}
2013-08-22 07:12:35 +00:00
self.conn.wait_for_ready();
2013-08-23 05:24:14 +00:00
Ok(num)
2013-08-22 07:12:35 +00:00
}
2013-08-23 07:13:42 +00:00
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
self.try_lazy_query(0, params)
2013-08-23 07:13:42 +00:00
}
}
2013-09-30 02:12:20 +00:00
/// Information about a column of the result of a query.
2013-09-29 06:02:21 +00:00
#[deriving(Eq)]
pub struct ResultDescription {
2013-09-30 02:12:20 +00:00
/// The name of the column
2013-09-29 06:02:21 +00:00
name: ~str,
2013-09-30 02:12:20 +00:00
/// The type of the data in the column
2013-09-29 06:02:21 +00:00
ty: PostgresType
}
impl ResultDescription {
fn from_row_description_entry(row: RowDescriptionEntry)
-> ResultDescription {
let RowDescriptionEntry { name, type_oid, .. } = row;
2013-09-29 06:02:21 +00:00
ResultDescription {
name: name,
ty: PostgresType::from_oid(type_oid)
}
}
}
2013-09-30 02:12:20 +00:00
/// A statement prepared inside of a transaction.
///
/// Provides additional functionality over a `NormalPostgresStatement`.
2013-11-24 22:42:44 +00:00
pub struct TransactionalPostgresStatement<'conn> {
priv stmt: NormalPostgresStatement<'conn>
}
2013-11-24 22:42:44 +00:00
impl<'conn> PostgresStatement for TransactionalPostgresStatement<'conn> {
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
self.stmt.param_types()
}
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
self.stmt.result_descriptions()
}
fn try_update(&self, params: &[&ToSql]) -> Result<uint, PostgresDbError> {
self.stmt.try_update(params)
}
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
self.stmt.try_query(params)
}
}
2013-11-24 22:42:44 +00:00
impl<'conn> TransactionalPostgresStatement<'conn> {
2013-09-30 02:12:20 +00:00
/// Attempts to execute the prepared statement, returning a lazily loaded
/// iterator over the resulting rows.
///
/// No more than `row_limit` rows will be stored in memory at a time. Rows
/// will be pulled from the database in batches of `row_limit` as needed.
/// If `row_limit` is 0, `try_lazy_query` is equivalent to `try_query`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if the number or types of the provided parameters do not match
/// the parameters of the statement.
pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
self.stmt.try_lazy_query(row_limit, params)
}
2013-09-30 02:12:20 +00:00
/// A convenience wrapper around `try_lazy_query`.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there was an error executing the statement.
pub fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> PostgresResult<'a> {
2013-09-30 15:35:03 +00:00
match self.try_lazy_query(row_limit, params) {
Ok(result) => result,
2013-10-21 04:06:12 +00:00
Err(err) => fail!("Error executing query:\n{}", err.to_str())
2013-09-30 15:35:03 +00:00
}
2013-09-30 02:12:20 +00:00
}
}
2013-09-30 02:12:20 +00:00
/// An iterator over the resulting rows of a query.
2013-11-24 22:42:44 +00:00
pub struct PostgresResult<'stmt> {
priv stmt: &'stmt NormalPostgresStatement<'stmt>,
priv name: ~str,
priv data: RingBuf<~[Option<~[u8]>]>,
priv row_limit: uint,
priv more_rows: bool
}
#[unsafe_destructor]
2013-11-24 22:42:44 +00:00
impl<'stmt> Drop for PostgresResult<'stmt> {
2013-09-18 05:06:47 +00:00
fn drop(&mut self) {
io_error::cond.trap(|_| {}).inside(|| {
self.stmt.conn.write_messages([
2013-11-10 04:58:38 +00:00
Close {
variant: 'P' as u8,
name: self.name.as_slice()
},
2013-11-10 04:58:38 +00:00
Sync]);
loop {
match self.stmt.conn.read_message() {
ReadyForQuery { .. } => break,
2013-10-25 05:30:34 +00:00
_ => {}
}
}
})
}
}
2013-11-24 22:42:44 +00:00
impl<'stmt> PostgresResult<'stmt> {
fn read_rows(&mut self) {
loop {
match self.stmt.conn.read_message() {
EmptyQueryResponse |
CommandComplete { .. } => {
self.more_rows = false;
break;
},
PortalSuspended => {
self.more_rows = true;
break;
},
DataRow { row } => self.data.push_back(row),
2013-10-25 05:30:34 +00:00
_ => unreachable!()
}
}
self.stmt.conn.wait_for_ready();
}
fn execute(&mut self) {
self.stmt.conn.write_messages([
2013-11-10 04:58:38 +00:00
Execute {
portal: self.name,
max_rows: self.row_limit as i32
},
2013-11-10 04:58:38 +00:00
Sync]);
self.read_rows();
}
2013-08-23 07:13:42 +00:00
}
2013-11-24 22:42:44 +00:00
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresResult<'stmt> {
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
if self.data.is_empty() && self.more_rows {
self.execute();
}
self.data.pop_front().map(|row| {
PostgresRow {
stmt: self.stmt,
data: row
}
})
2013-08-23 07:13:42 +00:00
}
}
2013-09-30 02:12:20 +00:00
/// A single result row of a query.
2013-09-30 05:22:10 +00:00
///
/// A value can be accessed by the name or index of its column, though access
/// by index is more efficient.
///
/// ```rust
/// let foo: i32 = row[0];
/// let bar: ~str = row["bar"];
/// ```
2013-11-24 22:42:44 +00:00
pub struct PostgresRow<'stmt> {
priv stmt: &'stmt NormalPostgresStatement<'stmt>,
priv data: ~[Option<~[u8]>]
2013-08-23 07:13:42 +00:00
}
2013-11-24 22:42:44 +00:00
impl<'stmt> Container for PostgresRow<'stmt> {
2013-09-30 05:22:10 +00:00
#[inline]
2013-08-23 07:13:42 +00:00
fn len(&self) -> uint {
self.data.len()
2013-08-23 07:13:42 +00:00
}
}
2013-11-24 22:42:44 +00:00
impl<'stmt, I: RowIndex, T: FromSql> Index<I, T> for PostgresRow<'stmt> {
2013-09-29 17:53:44 +00:00
#[inline]
fn index(&self, idx: &I) -> T {
let idx = idx.idx(self.stmt);
FromSql::from_sql(self.stmt.result_desc[idx].ty, &self.data[idx])
2013-08-23 07:13:42 +00:00
}
}
2013-09-30 02:12:20 +00:00
/// A trait implemented by types that can index into columns of a row.
pub trait RowIndex {
2013-09-30 02:12:20 +00:00
/// Returns the index of the appropriate column.
///
2013-10-13 06:19:57 +00:00
/// # Failure
///
2013-09-30 02:12:20 +00:00
/// Fails if there is no corresponding column.
fn idx(&self, stmt: &NormalPostgresStatement) -> uint;
}
impl RowIndex for uint {
2013-09-29 17:53:44 +00:00
#[inline]
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
*self
2013-08-23 07:13:42 +00:00
}
}
// This is a convenience as the 0 in get[0] resolves to int :(
impl RowIndex for int {
2013-09-29 17:53:44 +00:00
#[inline]
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
assert!(*self >= 0);
*self as uint
}
}
2013-09-03 00:07:08 +00:00
2013-11-24 22:42:44 +00:00
impl<'a> RowIndex for &'a str {
fn idx(&self, stmt: &NormalPostgresStatement) -> uint {
for (i, desc) in stmt.result_descriptions().iter().enumerate() {
if desc.name.as_slice() == *self {
return i;
}
}
2013-11-09 06:17:43 +00:00
fail!("There is no column with name {}", *self);
2013-09-03 00:07:08 +00:00
}
2013-08-23 07:13:42 +00:00
}