rust-postgres/src/lib.rs

882 lines
25 KiB
Rust
Raw Normal View History

2013-09-03 02:08:37 +00:00
#[link(name = "postgres",
vers = "0.1",
url = "https://github.com/sfackler/rust-postgres")];
2013-08-22 05:52:15 +00:00
extern mod extra;
2013-08-04 02:17:32 +00:00
use extra::container::Deque;
use extra::digest::Digest;
use extra::ringbuf::RingBuf;
use extra::md5::Md5;
use extra::url::{UserInfo, Url};
2013-08-22 05:52:15 +00:00
use std::cell::Cell;
use std::hashmap::HashMap;
2013-09-09 05:40:08 +00:00
use std::rt::io::{Writer, io_error, Decorator};
2013-09-14 20:03:38 +00:00
use std::rt::io::buffered::BufferedStream;
2013-09-08 21:26:34 +00:00
use std::rt::io::net;
use std::rt::io::net::ip;
2013-08-22 05:52:15 +00:00
use std::rt::io::net::ip::SocketAddr;
use std::rt::io::net::tcp::TcpStream;
2013-08-04 02:17:32 +00:00
use error::hack::PostgresSqlState;
2013-09-09 05:40:08 +00:00
use message::{BackendMessage,
2013-09-12 05:02:32 +00:00
AuthenticationOk,
AuthenticationKerberosV5,
2013-09-09 05:40:08 +00:00
AuthenticationCleartextPassword,
AuthenticationMD5Password,
2013-09-12 05:02:32 +00:00
AuthenticationSCMCredential,
AuthenticationGSS,
AuthenticationSSPI,
2013-09-09 05:40:08 +00:00
BackendKeyData,
BindComplete,
CommandComplete,
DataRow,
EmptyQueryResponse,
ErrorResponse,
NoData,
NoticeResponse,
ParameterDescription,
ParameterStatus,
ParseComplete,
PortalSuspended,
ReadyForQuery,
RowDescription};
use message::{FrontendMessage,
Bind,
Close,
Describe,
Execute,
Parse,
PasswordMessage,
Query,
StartupMessage,
Sync,
Terminate};
use message::{RowDescriptionEntry, WriteMessage, ReadMessage};
use types::{PostgresType, ToSql, FromSql};
2013-07-25 07:10:18 +00:00
mod error;
2013-08-22 05:52:15 +00:00
mod message;
mod types;
2013-07-25 07:10:18 +00:00
pub trait PostgresNoticeHandler {
fn handle(&mut self, notice: PostgresDbError);
}
pub struct DefaultNoticeHandler;
impl PostgresNoticeHandler for DefaultNoticeHandler {
fn handle(&mut self, notice: PostgresDbError) {
info2!("{}: {}", notice.severity, notice.message);
}
}
#[deriving(ToStr)]
pub enum PostgresConnectError {
InvalidUrl,
MissingUser,
2013-09-08 21:26:34 +00:00
DnsError,
SocketError,
DbError(PostgresDbError),
MissingPassword,
UnsupportedAuthentication
}
#[deriving(ToStr)]
pub enum PostgresErrorPosition {
Position(uint),
InternalPosition {
position: uint,
query: ~str
}
}
#[deriving(ToStr)]
pub struct PostgresDbError {
// This could almost be an enum, except the values can be localized :(
severity: ~str,
code: PostgresSqlState,
message: ~str,
detail: Option<~str>,
hint: Option<~str>,
position: Option<PostgresErrorPosition>,
where: Option<~str>,
file: ~str,
line: uint,
routine: ~str
}
impl PostgresDbError {
fn new(fields: ~[(u8, ~str)]) -> PostgresDbError {
// move_rev_iter is more efficient than move_iter
let mut map: HashMap<u8, ~str> = fields.move_rev_iter().collect();
PostgresDbError {
severity: map.pop(&('S' as u8)).unwrap(),
code: FromStr::from_str(map.pop(&('C' as u8)).unwrap()).unwrap(),
message: map.pop(&('M' as u8)).unwrap(),
detail: map.pop(&('D' as u8)),
hint: map.pop(&('H' as u8)),
position: match map.pop(&('P' as u8)) {
Some(pos) => Some(Position(FromStr::from_str(pos).unwrap())),
None => match map.pop(&('p' as u8)) {
Some(pos) => Some(InternalPosition {
position: FromStr::from_str(pos).unwrap(),
query: map.pop(&('q' as u8)).unwrap()
}),
None => None
}
},
where: map.pop(&('W' as u8)),
file: map.pop(&('F' as u8)).unwrap(),
line: FromStr::from_str(map.pop(&('L' as u8)).unwrap()).unwrap(),
routine: map.pop(&('R' as u8)).unwrap()
}
}
}
2013-08-29 06:19:53 +00:00
pub struct PostgresConnection {
2013-09-14 20:03:38 +00:00
priv stream: Cell<BufferedStream<TcpStream>>,
priv next_stmt_id: Cell<int>,
priv notice_handler: Cell<~PostgresNoticeHandler>
2013-08-29 06:19:53 +00:00
}
impl Drop for PostgresConnection {
fn drop(&self) {
do io_error::cond.trap(|_| {}).inside {
2013-09-14 20:03:38 +00:00
self.write_messages([&Terminate]);
2013-08-29 06:19:53 +00:00
}
}
}
2013-08-18 03:30:31 +00:00
impl PostgresConnection {
2013-08-23 02:47:06 +00:00
pub fn connect(url: &str) -> PostgresConnection {
match PostgresConnection::try_connect(url) {
Ok(conn) => conn,
Err(err) => fail2!("Failed to connect: {}", err.to_str())
}
}
pub fn try_connect(url: &str) -> Result<PostgresConnection,
PostgresConnectError> {
let Url {
host,
port,
user,
path,
query: args,
_
}: Url = match FromStr::from_str(url) {
Some(url) => url,
None => return Err(InvalidUrl)
};
let user = match user {
Some(user) => user,
None => return Err(MissingUser)
};
let mut args = args;
2013-09-08 21:26:34 +00:00
let port = match port {
Some(port) => FromStr::from_str(port).unwrap(),
None => 5432
};
let stream = match PostgresConnection::open_socket(host, port) {
Ok(stream) => stream,
Err(err) => return Err(err)
};
2013-08-22 05:52:15 +00:00
let conn = PostgresConnection {
2013-09-14 20:03:38 +00:00
stream: Cell::new(BufferedStream::new(stream)),
next_stmt_id: Cell::new(0),
2013-09-12 05:02:32 +00:00
notice_handler: Cell::new(~DefaultNoticeHandler
as ~PostgresNoticeHandler)
2013-08-18 03:42:40 +00:00
};
args.push((~"client_encoding", ~"UTF8"));
2013-09-09 04:33:41 +00:00
// Postgres uses the value of TimeZone as the time zone for TIMESTAMP
// WITH TIME ZONE values. Timespec converts to GMT internally.
args.push((~"TimeZone", ~"GMT"));
// We have to clone here since we need the user again for auth
args.push((~"user", user.user.clone()));
if !path.is_empty() {
args.push((~"database", path));
2013-08-04 02:17:32 +00:00
}
2013-09-14 20:03:38 +00:00
conn.write_messages([&StartupMessage {
2013-09-09 05:40:08 +00:00
version: message::PROTOCOL_VERSION,
parameters: args.as_slice()
2013-09-14 20:03:38 +00:00
}]);
match conn.handle_auth(user) {
Some(err) => return Err(err),
None => ()
}
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
loop {
match conn.read_message() {
BackendKeyData {_} => (),
ReadyForQuery {_} => break,
_ => fail!()
}
2013-08-05 00:48:48 +00:00
}
2013-08-23 05:24:14 +00:00
Ok(conn)
2013-08-05 00:48:48 +00:00
}
2013-09-08 21:26:34 +00:00
fn open_socket(host: &str, port: ip::Port)
-> Result<TcpStream, PostgresConnectError> {
let addrs = do io_error::cond.trap(|_| {}).inside {
net::get_host_addresses(host)
};
let addrs = match addrs {
Some(addrs) => addrs,
None => return Err(DnsError)
};
for addr in addrs.iter() {
let socket = do io_error::cond.trap(|_| {}).inside {
TcpStream::connect(SocketAddr { ip: *addr, port: port })
};
match socket {
Some(socket) => return Ok(socket),
None => ()
}
}
Err(SocketError)
}
fn write_messages(&self, messages: &[&FrontendMessage]) {
2013-08-22 05:52:15 +00:00
do self.stream.with_mut_ref |s| {
2013-09-14 20:03:38 +00:00
for &message in messages.iter() {
s.write_message(message);
}
s.flush();
2013-07-25 07:10:18 +00:00
}
2013-08-22 06:41:26 +00:00
}
2013-08-04 02:17:32 +00:00
2013-08-22 06:41:26 +00:00
fn read_message(&self) -> BackendMessage {
loop {
let msg = do self.stream.with_mut_ref |s| {
s.read_message()
};
match msg {
NoticeResponse { fields } => {
let mut handler = self.notice_handler.take();
handler.handle(PostgresDbError::new(fields));
self.notice_handler.put_back(handler);
}
ParameterStatus { parameter, value } =>
debug!("Parameter %s = %s", parameter, value),
msg => return msg
}
2013-08-04 05:21:16 +00:00
}
2013-08-22 06:41:26 +00:00
}
fn handle_auth(&self, user: UserInfo) -> Option<PostgresConnectError> {
match self.read_message() {
AuthenticationOk => return None,
AuthenticationCleartextPassword => {
let pass = match user.pass {
Some(pass) => pass,
None => return Some(MissingPassword)
};
2013-09-14 20:03:38 +00:00
self.write_messages([&PasswordMessage { password: pass }]);
}
AuthenticationMD5Password { salt } => {
let UserInfo { user, pass } = user;
let pass = match pass {
Some(pass) => pass,
None => return Some(MissingPassword)
};
let input = pass + user;
let mut md5 = Md5::new();
md5.input_str(input);
let output = md5.result_str();
md5.reset();
md5.input_str(output);
md5.input(salt);
let output = "md5" + md5.result_str();
2013-09-14 20:03:38 +00:00
self.write_messages([&PasswordMessage {
password: output.as_slice()
2013-09-14 20:03:38 +00:00
}]);
}
2013-09-12 05:02:32 +00:00
AuthenticationKerberosV5
| AuthenticationSCMCredential
| AuthenticationGSS
| AuthenticationSSPI => return Some(UnsupportedAuthentication),
_ => fail!()
}
match self.read_message() {
AuthenticationOk => None,
ErrorResponse { fields } =>
Some(DbError(PostgresDbError::new(fields))),
_ => fail!()
}
}
pub fn set_notice_handler(&self, handler: ~PostgresNoticeHandler)
-> ~PostgresNoticeHandler {
let old_handler = self.notice_handler.take();
self.notice_handler.put_back(handler);
old_handler
}
pub fn prepare<'a>(&'a self, query: &str) -> NormalPostgresStatement<'a> {
match self.try_prepare(query) {
Ok(stmt) => stmt,
Err(err) => fail2!("Error preparing \"{}\": {}", query,
err.to_str())
}
}
pub fn try_prepare<'a>(&'a self, query: &str)
-> Result<NormalPostgresStatement<'a>, PostgresDbError> {
2013-08-22 06:41:26 +00:00
let id = self.next_stmt_id.take();
2013-08-27 02:38:02 +00:00
let stmt_name = format!("statement_{}", id);
2013-08-22 06:41:26 +00:00
self.next_stmt_id.put_back(id + 1);
2013-08-05 00:48:48 +00:00
2013-08-22 06:41:26 +00:00
let types = [];
self.write_messages([
2013-09-02 17:27:09 +00:00
&Parse {
name: stmt_name,
query: query,
param_types: types
},
&Describe {
variant: 'S' as u8,
name: stmt_name
},
&Sync]);
2013-08-22 06:41:26 +00:00
match self.read_message() {
2013-08-22 05:52:15 +00:00
ParseComplete => (),
ErrorResponse { fields } => {
self.wait_for_ready();
return Err(PostgresDbError::new(fields));
}
_ => fail!()
}
2013-08-22 06:41:26 +00:00
let param_types = match self.read_message() {
ParameterDescription { types } =>
types.iter().map(|ty| { PostgresType::from_oid(*ty) })
.collect(),
_ => fail!()
};
2013-08-22 06:41:26 +00:00
let result_desc = match self.read_message() {
RowDescription { descriptions } => {
let mut res: ~[ResultDescription] = descriptions
.move_rev_iter().map(|desc| {
ResultDescription::from_row_description_entry(desc)
}).collect();
res.reverse();
res
},
NoData => ~[],
_ => fail!()
};
2013-08-17 22:09:26 +00:00
2013-08-22 06:41:26 +00:00
self.wait_for_ready();
Ok(NormalPostgresStatement {
2013-08-22 06:41:26 +00:00
conn: self,
name: stmt_name,
param_types: param_types,
result_desc: result_desc,
next_portal_id: Cell::new(0)
})
2013-08-22 06:41:26 +00:00
}
2013-08-29 05:44:34 +00:00
pub fn in_transaction<T>(&self, blk: &fn(&PostgresTransaction) -> T) -> T {
2013-08-23 07:13:42 +00:00
self.quick_query("BEGIN");
2013-08-23 05:24:14 +00:00
2013-08-27 05:40:23 +00:00
let trans = PostgresTransaction {
conn: self,
commit: Cell::new(true)
};
2013-08-23 05:24:14 +00:00
// If this fails, Postgres will rollback when the connection closes
2013-08-27 05:40:23 +00:00
let ret = blk(&trans);
2013-08-23 05:24:14 +00:00
2013-08-27 05:40:23 +00:00
if trans.commit.take() {
2013-08-23 07:13:42 +00:00
self.quick_query("COMMIT");
2013-08-23 05:24:14 +00:00
} else {
2013-08-23 07:13:42 +00:00
self.quick_query("ROLLBACK");
2013-08-23 05:24:14 +00:00
}
ret
}
2013-09-01 18:06:33 +00:00
pub fn update(&self, query: &str, params: &[&ToSql]) -> uint {
match self.try_update(query, params) {
Ok(res) => res,
Err(err) => fail2!("Error running update: {}", err.to_str())
2013-09-01 18:06:33 +00:00
}
}
pub fn try_update(&self, query: &str, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
2013-09-14 20:03:38 +00:00
do self.try_prepare(query).and_then |stmt| {
2013-09-01 18:06:33 +00:00
stmt.try_update(params)
}
}
2013-08-23 07:13:42 +00:00
fn quick_query(&self, query: &str) {
2013-09-14 20:03:38 +00:00
self.write_messages([&Query { query: query }]);
2013-08-23 07:13:42 +00:00
loop {
match self.read_message() {
ReadyForQuery {_} => break,
ErrorResponse { fields } =>
fail2!("Error: {}", PostgresDbError::new(fields).to_str()),
2013-08-23 07:13:42 +00:00
_ => ()
}
2013-08-23 07:13:42 +00:00
}
}
2013-08-22 06:41:26 +00:00
fn wait_for_ready(&self) {
match self.read_message() {
ReadyForQuery {_} => (),
_ => fail!()
}
2013-08-17 22:09:26 +00:00
}
}
2013-08-27 05:40:23 +00:00
pub struct PostgresTransaction<'self> {
priv conn: &'self PostgresConnection,
priv commit: Cell<bool>
}
impl<'self> PostgresTransaction<'self> {
pub fn prepare<'a>(&'a self, query: &str)
-> TransactionalPostgresStatement<'a> {
TransactionalPostgresStatement { stmt: self.conn.prepare(query) }
2013-08-27 05:40:23 +00:00
}
pub fn try_prepare<'a>(&'a self, query: &str)
-> Result<TransactionalPostgresStatement<'a>, PostgresDbError> {
do self.conn.try_prepare(query).map_move |stmt| {
TransactionalPostgresStatement { stmt: stmt }
}
2013-08-27 05:40:23 +00:00
}
2013-09-01 18:06:33 +00:00
pub fn update(&self, query: &str, params: &[&ToSql]) -> uint {
self.conn.update(query, params)
}
pub fn try_update(&self, query: &str, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
self.conn.try_update(query, params)
}
2013-09-05 06:28:44 +00:00
pub fn in_transaction<T>(&self, blk: &fn(&PostgresTransaction) -> T) -> T {
self.conn.quick_query("SAVEPOINT sp");
2013-09-05 06:28:44 +00:00
let nested_trans = PostgresTransaction {
conn: self.conn,
commit: Cell::new(true)
};
let ret = blk(&nested_trans);
if nested_trans.commit.take() {
self.conn.quick_query("RELEASE sp");
2013-09-05 06:28:44 +00:00
} else {
self.conn.quick_query("ROLLBACK TO sp");
2013-09-05 06:28:44 +00:00
}
ret
}
2013-08-27 05:40:23 +00:00
pub fn will_commit(&self) -> bool {
let commit = self.commit.take();
self.commit.put_back(commit);
commit
}
pub fn set_commit(&self) {
self.commit.take();
self.commit.put_back(true);
}
pub fn set_rollback(&self) {
self.commit.take();
self.commit.put_back(false);
}
}
pub trait PostgresStatement {
fn param_types<'a>(&'a self) -> &'a [PostgresType];
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription];
fn update(&self, params: &[&ToSql]) -> uint;
fn try_update(&self, params: &[&ToSql]) -> Result<uint, PostgresDbError>;
fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a>;
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError>;
fn find_col_named(&self, col: &str) -> Option<uint>;
}
pub struct NormalPostgresStatement<'self> {
2013-08-22 05:52:15 +00:00
priv conn: &'self PostgresConnection,
2013-08-22 06:41:26 +00:00
priv name: ~str,
priv param_types: ~[PostgresType],
priv result_desc: ~[ResultDescription],
priv next_portal_id: Cell<uint>
2013-08-22 07:12:35 +00:00
}
#[deriving(Eq)]
pub struct ResultDescription {
name: ~str,
ty: PostgresType
}
impl ResultDescription {
fn from_row_description_entry(row: RowDescriptionEntry)
-> ResultDescription {
let RowDescriptionEntry { name, type_oid, _ } = row;
ResultDescription {
name: name,
ty: PostgresType::from_oid(type_oid)
}
}
}
2013-08-22 07:12:35 +00:00
#[unsafe_destructor]
impl<'self> Drop for NormalPostgresStatement<'self> {
2013-08-22 07:12:35 +00:00
fn drop(&self) {
do io_error::cond.trap(|_| {}).inside {
self.conn.write_messages([
2013-09-01 05:48:49 +00:00
&Close {
variant: 'S' as u8,
name: self.name.as_slice()
},
&Sync]);
loop {
match self.conn.read_message() {
ReadyForQuery {_} => break,
_ => ()
}
2013-08-23 05:24:14 +00:00
}
}
2013-08-22 07:12:35 +00:00
}
}
impl<'self> NormalPostgresStatement<'self> {
fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
2013-09-01 05:48:49 +00:00
-> Option<PostgresDbError> {
let mut formats = ~[];
let mut values = ~[];
2013-09-12 05:33:19 +00:00
assert!(self.param_types.len() == params.len(),
"Expected %u parameters but found %u",
self.param_types.len(), params.len());
for (&param, &ty) in params.iter().zip(self.param_types.iter()) {
let (format, value) = param.to_sql(ty);
formats.push(format as i16);
values.push(value);
};
let result_formats: ~[i16] = self.result_desc.iter().map(|desc| {
desc.ty.result_format() as i16
}).collect();
2013-08-22 07:12:35 +00:00
self.conn.write_messages([
2013-09-01 05:48:49 +00:00
&Bind {
portal: portal_name,
statement: self.name.as_slice(),
formats: formats,
values: values,
result_formats: result_formats
},
&Execute {
portal: portal_name,
max_rows: row_limit as i32
},
2013-09-01 05:48:49 +00:00
&Sync]);
2013-08-22 07:12:35 +00:00
match self.conn.read_message() {
BindComplete => None,
ErrorResponse { fields } => {
self.conn.wait_for_ready();
Some(PostgresDbError::new(fields))
}
_ => fail!()
}
2013-08-23 05:24:14 +00:00
}
fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> PostgresResult<'a> {
match self.try_lazy_query(row_limit, params) {
Ok(result) => result,
Err(err) => fail2!("Error executing query: {}", err.to_str())
}
}
fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
let id = self.next_portal_id.take();
let portal_name = format!("{}_portal_{}", self.name, id);
self.next_portal_id.put_back(id + 1);
match self.execute(portal_name, row_limit, params) {
Some(err) => {
return Err(err);
}
None => ()
}
let mut result = PostgresResult {
stmt: self,
name: portal_name,
data: RingBuf::new(),
row_limit: row_limit,
more_rows: true
};
result.read_rows();
Ok(result)
}
}
2013-08-23 05:24:14 +00:00
impl<'self> PostgresStatement for NormalPostgresStatement<'self> {
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
self.param_types.as_slice()
}
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
self.result_desc.as_slice()
}
fn update(&self, params: &[&ToSql]) -> uint {
match self.try_update(params) {
Ok(count) => count,
Err(err) => fail2!("Error running update: {}", err.to_str())
}
}
fn try_update(&self, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
match self.execute("", 0, params) {
Some(err) => {
return Err(err);
}
None => ()
}
2013-08-22 07:12:35 +00:00
2013-08-29 05:44:34 +00:00
let num;
2013-08-23 05:24:14 +00:00
loop {
match self.conn.read_message() {
CommandComplete { tag } => {
let s = tag.split_iter(' ').last().unwrap();
2013-08-29 05:44:34 +00:00
num = match FromStr::from_str(s) {
None => 0,
Some(n) => n
};
2013-08-23 05:24:14 +00:00
break;
}
DataRow {_} => (),
2013-08-29 05:44:34 +00:00
EmptyQueryResponse => {
num = 0;
break;
}
NoticeResponse {_} => (),
ErrorResponse { fields } => {
self.conn.wait_for_ready();
return Err(PostgresDbError::new(fields));
}
_ => fail!()
}
2013-08-23 05:24:14 +00:00
}
2013-08-22 07:12:35 +00:00
self.conn.wait_for_ready();
2013-08-23 05:24:14 +00:00
Ok(num)
2013-08-22 07:12:35 +00:00
}
2013-08-23 07:13:42 +00:00
fn query<'a>(&'a self, params: &[&ToSql])
-> PostgresResult<'a> {
self.lazy_query(0, params)
}
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
self.try_lazy_query(0, params)
2013-08-23 07:13:42 +00:00
}
2013-09-03 00:07:08 +00:00
fn find_col_named(&self, col: &str) -> Option<uint> {
2013-09-03 00:07:08 +00:00
do self.result_desc.iter().position |desc| {
desc.name.as_slice() == col
}
}
2013-08-23 07:13:42 +00:00
}
pub struct TransactionalPostgresStatement<'self> {
priv stmt: NormalPostgresStatement<'self>
}
impl<'self> PostgresStatement for TransactionalPostgresStatement<'self> {
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
self.stmt.param_types()
}
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
self.stmt.result_descriptions()
}
fn update(&self, params: &[&ToSql]) -> uint {
self.stmt.update(params)
}
fn try_update(&self, params: &[&ToSql]) -> Result<uint, PostgresDbError> {
self.stmt.try_update(params)
}
fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> {
self.stmt.query(params)
}
fn try_query<'a>(&'a self, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
self.stmt.try_query(params)
}
fn find_col_named(&self, col: &str) -> Option<uint> {
self.stmt.find_col_named(col)
}
}
impl<'self> TransactionalPostgresStatement<'self> {
pub fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> PostgresResult<'a> {
self.stmt.lazy_query(row_limit, params)
}
pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
-> Result<PostgresResult<'a>, PostgresDbError> {
2013-09-05 06:48:02 +00:00
self.stmt.try_lazy_query(row_limit, params)
}
}
2013-08-23 07:13:42 +00:00
pub struct PostgresResult<'self> {
priv stmt: &'self NormalPostgresStatement<'self>,
priv name: ~str,
priv data: RingBuf<~[Option<~[u8]>]>,
priv row_limit: uint,
priv more_rows: bool
}
#[unsafe_destructor]
impl<'self> Drop for PostgresResult<'self> {
fn drop(&self) {
do io_error::cond.trap(|_| {}).inside {
self.stmt.conn.write_messages([
&Close {
variant: 'P' as u8,
name: self.name.as_slice()
},
&Sync]);
loop {
match self.stmt.conn.read_message() {
ReadyForQuery {_} => break,
_ => ()
}
}
}
}
}
impl<'self> PostgresResult<'self> {
fn read_rows(&mut self) {
loop {
match self.stmt.conn.read_message() {
EmptyQueryResponse |
CommandComplete {_} => {
self.more_rows = false;
break;
},
PortalSuspended => {
self.more_rows = true;
break;
},
DataRow { row } => self.data.push_back(row),
_ => fail!()
}
}
self.stmt.conn.wait_for_ready();
}
fn execute(&mut self) {
self.stmt.conn.write_messages([
&Execute {
portal: self.name,
max_rows: self.row_limit as i32
},
&Sync]);
self.read_rows();
}
2013-08-23 07:13:42 +00:00
}
impl<'self> Iterator<PostgresRow<'self>> for PostgresResult<'self> {
fn next(&mut self) -> Option<PostgresRow<'self>> {
if self.data.is_empty() && self.more_rows {
self.execute();
}
do self.data.pop_front().map_move |row| {
PostgresRow {
stmt: self.stmt,
data: row
}
2013-08-23 07:13:42 +00:00
}
}
}
pub struct PostgresRow<'self> {
priv stmt: &'self NormalPostgresStatement<'self>,
priv data: ~[Option<~[u8]>]
2013-08-23 07:13:42 +00:00
}
impl<'self> Container for PostgresRow<'self> {
2013-08-23 07:13:42 +00:00
fn len(&self) -> uint {
self.data.len()
2013-08-23 07:13:42 +00:00
}
}
impl<'self, I: RowIndex, T: FromSql> Index<I, T> for PostgresRow<'self> {
fn index(&self, idx: &I) -> T {
let idx = idx.idx(self.stmt);
FromSql::from_sql(self.stmt.result_desc[idx].ty,
&self.data[idx])
2013-08-23 07:13:42 +00:00
}
}
pub trait RowIndex {
fn idx(&self, stmt: &NormalPostgresStatement) -> uint;
}
impl RowIndex for uint {
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
*self
2013-08-23 07:13:42 +00:00
}
}
// This is a convenience as the 0 in get[0] resolves to int :(
impl RowIndex for int {
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
assert!(*self >= 0);
*self as uint
}
}
2013-09-03 00:07:08 +00:00
impl<'self> RowIndex for &'self str {
fn idx(&self, stmt: &NormalPostgresStatement) -> uint {
match stmt.find_col_named(*self) {
2013-09-03 00:07:08 +00:00
Some(idx) => idx,
None => fail2!("No column with name {}", *self)
}
2013-09-03 00:07:08 +00:00
}
2013-08-23 07:13:42 +00:00
}