diff --git a/src/lib.rs b/src/lib.rs index ad380c3e..ebdf2c40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,7 +115,6 @@ use message::{AuthenticationCleartextPassword, BackendKeyData, BackendMessage, BindComplete, - CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, @@ -128,11 +127,8 @@ use message::{AuthenticationCleartextPassword, PortalSuspended, ReadyForQuery, RowDescription}; -use message::{Bind, - CancelRequest, - Close, +use message::{CancelRequest, Describe, - Execute, FrontendMessage, Parse, PasswordMessage, @@ -141,15 +137,16 @@ use message::{Bind, StartupMessage, Sync, Terminate}; -use message::{RowDescriptionEntry, WriteMessage, ReadMessage}; -use types::{Oid, PostgresType, ToSql, FromSql, PgUnknownType}; +use message::{WriteMessage, ReadMessage}; +use types::{Oid, PostgresType, ToSql, PgUnknownType}; -pub mod error; -pub mod pool; -mod message; -pub mod types; -#[cfg(test)] -mod test; +pub use stmt::{NormalPostgresStatement, + PostgresResult, + PostgresRow, + PostgresStatement, + ResultDescription, + RowIndex, + TransactionalPostgresStatement}; macro_rules! if_ok_pg_conn( ($e:expr) => ( @@ -197,6 +194,14 @@ macro_rules! fail_unless_failing( ) ) +pub mod error; +pub mod pool; +mod message; +mod stmt; +pub mod types; +#[cfg(test)] +mod test; + static DEFAULT_PORT: Port = 5432; /// Trait for types that can handle Postgres notice messages @@ -609,14 +614,10 @@ impl InnerPostgresConnection { } } - Ok(NormalPostgresStatement { - conn: conn, - name: stmt_name, - param_types: param_types, - result_desc: result_desc, - next_portal_id: Cell::new(0), - finished: Cell::new(false), - }) + Ok(stmt::make_NormalPostgresStatement(conn, + stmt_name, + param_types, + result_desc)) } fn is_desynchronized(&self) -> bool { @@ -908,18 +909,14 @@ impl<'conn> PostgresTransaction<'conn> { pub fn try_prepare<'a>(&'a self, query: &str) -> Result, PostgresError> { self.conn.try_prepare(query).map(|stmt| { - TransactionalPostgresStatement { - stmt: stmt - } + stmt::make_TransactionalPostgresStatement(stmt) }) } /// Like `PostgresConnection::prepare`. pub fn prepare<'a>(&'a self, query: &str) -> TransactionalPostgresStatement<'a> { - TransactionalPostgresStatement { - stmt: self.conn.prepare(query) - } + stmt::make_TransactionalPostgresStatement(self.conn.prepare(query)) } /// Like `PostgresConnection::try_execute`. @@ -989,497 +986,3 @@ impl<'conn> PostgresTransaction<'conn> { } } -/// A trait containing methods that can be called on a prepared statement. -pub trait PostgresStatement { - /// Returns a slice containing the expected parameter types. - fn param_types<'a>(&'a self) -> &'a [PostgresType]; - - /// Returns a slice describing the columns of the result of the query. - fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription]; - - /// Attempts to execute the prepared statement, returning the number of - /// rows modified. - /// - /// If the statement does not modify any rows (e.g. SELECT), 0 is returned. - /// - /// # Failure - /// - /// Fails if the number or types of the provided parameters do not match - /// the parameters of the statement. - fn try_execute(&self, params: &[&ToSql]) -> Result; - - /// A convenience function wrapping `try_execute`. - /// - /// # Failure - /// - /// Fails if there was an error executing the statement. - fn execute(&self, params: &[&ToSql]) -> uint { - match self.try_execute(params) { - Ok(count) => count, - Err(err) => fail!("Error running query\n{}", err.to_str()) - } - } - - /// Attempts to execute the prepared statement, returning an iterator over - /// the resulting rows. - /// - /// # Failure - /// - /// Fails if the number or types of the provided parameters do not match - /// the parameters of the statement. - fn try_query<'a>(&'a self, params: &[&ToSql]) - -> Result, PostgresError>; - - /// A convenience function wrapping `try_query`. - /// - /// # Failure - /// - /// Fails if there was an error executing the statement. - fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> { - match self.try_query(params) { - Ok(result) => result, - Err(err) => fail!("Error executing query:\n{}", err.to_str()) - } - } - - /// Consumes the statement, clearing it from the Postgres session. - /// - /// Functionally identical to the `Drop` implementation of the - /// `PostgresStatement` except that it returns any error to the caller. - fn finish(self) -> Result<(), PostgresError>; -} - -/// A statement prepared outside of a transaction. -pub struct NormalPostgresStatement<'conn> { - priv conn: &'conn PostgresConnection, - priv name: ~str, - priv param_types: ~[PostgresType], - priv result_desc: ~[ResultDescription], - priv next_portal_id: Cell, - priv finished: Cell, -} - -#[unsafe_destructor] -impl<'conn> Drop for NormalPostgresStatement<'conn> { - fn drop(&mut self) { - if !self.finished.get() { - match self.finish_inner() { - Ok(()) | Err(PgStreamDesynchronized) => {} - Err(err) => - fail_unless_failing!("Error dropping statement: {}", err) - } - } - } -} - -impl<'conn> NormalPostgresStatement<'conn> { - fn finish_inner(&mut self) -> Result<(), PostgresError> { - check_desync!(self.conn); - if_ok_pg!(self.conn.write_messages([ - Close { - variant: 'S' as u8, - name: self.name.as_slice() - }, - Sync])); - loop { - match if_ok_pg!(self.conn.read_message()) { - ReadyForQuery { .. } => break, - ErrorResponse { fields } => { - if_ok!(self.conn.wait_for_ready()); - return Err(PgDbError(PostgresDbError::new(fields))); - } - _ => {} - } - } - Ok(()) - } - - fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql]) - -> Result<(), PostgresError> { - let mut formats = ~[]; - let mut values = ~[]; - assert!(self.param_types.len() == params.len(), - "Expected {} parameters but found {}", - self.param_types.len(), params.len()); - for (¶m, ty) in params.iter().zip(self.param_types.iter()) { - let (format, value) = param.to_sql(ty); - formats.push(format as i16); - values.push(value); - }; - - let result_formats: ~[i16] = self.result_desc.iter().map(|desc| { - desc.ty.result_format() as i16 - }).collect(); - - if_ok_pg!(self.conn.write_messages([ - Bind { - portal: portal_name, - statement: self.name.as_slice(), - formats: formats, - values: values, - result_formats: result_formats - }, - Execute { - portal: portal_name, - max_rows: row_limit as i32 - }, - Sync])); - - match if_ok_pg!(self.conn.read_message()) { - BindComplete => Ok(()), - ErrorResponse { fields } => { - if_ok!(self.conn.wait_for_ready()); - Err(PgDbError(PostgresDbError::new(fields))) - } - _ => unreachable!() - } - } - - fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) - -> Result, PostgresError> { - let id = self.next_portal_id.get(); - self.next_portal_id.set(id + 1); - let portal_name = format!("{}_portal_{}", self.name, id); - - if_ok!(self.execute(portal_name, row_limit, params)); - - let mut result = PostgresResult { - stmt: self, - name: portal_name, - data: RingBuf::new(), - row_limit: row_limit, - more_rows: true, - finished: false, - }; - if_ok!(result.read_rows()) - - Ok(result) - } -} - -impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> { - fn param_types<'a>(&'a self) -> &'a [PostgresType] { - self.param_types.as_slice() - } - - fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] { - self.result_desc.as_slice() - } - - fn try_execute(&self, params: &[&ToSql]) - -> Result { - check_desync!(self.conn); - if_ok!(self.execute("", 0, params)); - - let num; - loop { - match if_ok_pg!(self.conn.read_message()) { - DataRow { .. } => {} - ErrorResponse { fields } => { - if_ok!(self.conn.wait_for_ready()); - return Err(PgDbError(PostgresDbError::new(fields))); - } - CommandComplete { tag } => { - let s = tag.split(' ').last().unwrap(); - num = match FromStr::from_str(s) { - None => 0, - Some(n) => n - }; - break; - } - EmptyQueryResponse => { - num = 0; - break; - } - _ => unreachable!() - } - } - if_ok!(self.conn.wait_for_ready()); - - Ok(num) - } - - fn try_query<'a>(&'a self, params: &[&ToSql]) - -> Result, PostgresError> { - check_desync!(self.conn); - self.try_lazy_query(0, params) - } - - fn finish(mut self) -> Result<(), PostgresError> { - self.finished.set(true); - self.finish_inner() - } -} - -/// Information about a column of the result of a query. -#[deriving(Eq)] -pub struct ResultDescription { - /// The name of the column - name: ~str, - /// The type of the data in the column - ty: PostgresType -} - -impl ResultDescription { - fn from_row_description_entry(row: RowDescriptionEntry) - -> ResultDescription { - let RowDescriptionEntry { name, type_oid, .. } = row; - - ResultDescription { - name: name, - ty: PostgresType::from_oid(type_oid) - } - } -} - -/// A statement prepared inside of a transaction. -/// -/// Provides additional functionality over a `NormalPostgresStatement`. -pub struct TransactionalPostgresStatement<'conn> { - priv stmt: NormalPostgresStatement<'conn> -} - -impl<'conn> PostgresStatement for TransactionalPostgresStatement<'conn> { - fn param_types<'a>(&'a self) -> &'a [PostgresType] { - self.stmt.param_types() - } - - fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] { - self.stmt.result_descriptions() - } - - fn try_execute(&self, params: &[&ToSql]) -> Result { - self.stmt.try_execute(params) - } - - fn try_query<'a>(&'a self, params: &[&ToSql]) - -> Result, PostgresError> { - self.stmt.try_query(params) - } - - fn finish(self) -> Result<(), PostgresError> { - self.stmt.finish() - } -} - -impl<'conn> TransactionalPostgresStatement<'conn> { - /// Attempts to execute the prepared statement, returning a lazily loaded - /// iterator over the resulting rows. - /// - /// No more than `row_limit` rows will be stored in memory at a time. Rows - /// will be pulled from the database in batches of `row_limit` as needed. - /// If `row_limit` is 0, `try_lazy_query` is equivalent to `try_query`. - /// - /// # Failure - /// - /// Fails if the number or types of the provided parameters do not match - /// the parameters of the statement. - pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) - -> Result, PostgresError> { - check_desync!(self.stmt.conn); - self.stmt.try_lazy_query(row_limit, params) - } - - /// A convenience wrapper around `try_lazy_query`. - /// - /// # Failure - /// - /// Fails if there was an error executing the statement. - pub fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) - -> PostgresResult<'a> { - match self.try_lazy_query(row_limit, params) { - Ok(result) => result, - Err(err) => fail!("Error executing query:\n{}", err) - } - } -} - -/// An iterator over the resulting rows of a query. -pub struct PostgresResult<'stmt> { - priv stmt: &'stmt NormalPostgresStatement<'stmt>, - priv name: ~str, - priv data: RingBuf<~[Option<~[u8]>]>, - priv row_limit: uint, - priv more_rows: bool, - priv finished: bool, -} - -#[unsafe_destructor] -impl<'stmt> Drop for PostgresResult<'stmt> { - fn drop(&mut self) { - if !self.finished { - match self.finish_inner() { - Ok(()) | Err(PgStreamDesynchronized) => {} - Err(err) => - fail_unless_failing!("Error dropping result: {}", err) - } - } - } -} - -impl<'stmt> PostgresResult<'stmt> { - fn finish_inner(&mut self) -> Result<(), PostgresError> { - check_desync!(self.stmt.conn); - if_ok_pg!(self.stmt.conn.write_messages([ - Close { - variant: 'P' as u8, - name: self.name.as_slice() - }, - Sync])); - loop { - match if_ok_pg!(self.stmt.conn.read_message()) { - ReadyForQuery { .. } => break, - ErrorResponse { fields } => { - if_ok!(self.stmt.conn.wait_for_ready()); - return Err(PgDbError(PostgresDbError::new(fields))); - } - _ => {} - } - } - Ok(()) - } - - fn read_rows(&mut self) -> Result<(), PostgresError> { - loop { - match if_ok_pg!(self.stmt.conn.read_message()) { - EmptyQueryResponse | - CommandComplete { .. } => { - self.more_rows = false; - break; - }, - PortalSuspended => { - self.more_rows = true; - break; - }, - DataRow { row } => self.data.push_back(row), - _ => unreachable!() - } - } - self.stmt.conn.wait_for_ready() - } - - fn execute(&mut self) -> Result<(), PostgresError> { - if_ok_pg!(self.stmt.conn.write_messages([ - Execute { - portal: self.name, - max_rows: self.row_limit as i32 - }, - Sync])); - self.read_rows() - } -} - -impl<'stmt> PostgresResult<'stmt> { - /// Consumes the `PostgresResult`, cleaning up associated state. - /// - /// Functionally identical to the `Drop` implementation on - /// `PostgresResult` except that it returns any error to the caller. - pub fn finish(mut self) -> Result<(), PostgresError> { - self.finished = true; - self.finish_inner() - } - - /// Like `PostgresResult::next` except that it returns any errors to the - /// caller instead of failing. - pub fn try_next(&mut self) - -> Result>, PostgresError> { - if self.data.is_empty() && self.more_rows { - if_ok!(self.execute()); - } - - let row = self.data.pop_front().map(|row| { - PostgresRow { - stmt: self.stmt, - data: row - } - }); - Ok(row) - } -} - -impl<'stmt> Iterator> for PostgresResult<'stmt> { - fn next(&mut self) -> Option> { - match self.try_next() { - Ok(ok) => ok, - Err(err) => fail!("Error fetching rows: {}", err) - } - } - - fn size_hint(&self) -> (uint, Option) { - let lower = self.data.len(); - let upper = if self.more_rows { - None - } else { - Some(lower) - }; - (lower, upper) - } -} - -/// A single result row of a query. -/// -/// A value can be accessed by the name or index of its column, though access -/// by index is more efficient. Rows are 1-indexed. -/// -/// ```rust -/// let foo: i32 = row[1]; -/// let bar: ~str = row["bar"]; -/// ``` -pub struct PostgresRow<'stmt> { - priv stmt: &'stmt NormalPostgresStatement<'stmt>, - priv data: ~[Option<~[u8]>] -} - -impl<'stmt> Container for PostgresRow<'stmt> { - #[inline] - fn len(&self) -> uint { - self.data.len() - } -} - -impl<'stmt, I: RowIndex, T: FromSql> Index for PostgresRow<'stmt> { - #[inline] - fn index(&self, idx: &I) -> T { - let idx = idx.idx(self.stmt); - FromSql::from_sql(&self.stmt.result_desc[idx].ty, &self.data[idx]) - } -} - -/// A trait implemented by types that can index into columns of a row. -pub trait RowIndex { - /// Returns the index of the appropriate column. - /// - /// # Failure - /// - /// Fails if there is no corresponding column. - fn idx(&self, stmt: &NormalPostgresStatement) -> uint; -} - -impl RowIndex for uint { - #[inline] - fn idx(&self, _stmt: &NormalPostgresStatement) -> uint { - assert!(*self != 0, "out of bounds row access"); - *self - 1 - } -} - -// This is a convenience as the 1 in get[1] resolves to int :( -impl RowIndex for int { - #[inline] - fn idx(&self, _stmt: &NormalPostgresStatement) -> uint { - assert!(*self >= 1, "out of bounds row access"); - (*self - 1) as uint - } -} - -impl<'a> RowIndex for &'a str { - fn idx(&self, stmt: &NormalPostgresStatement) -> uint { - for (i, desc) in stmt.result_descriptions().iter().enumerate() { - if desc.name.as_slice() == *self { - return i; - } - } - fail!("there is no column with name {}", *self); - } -} - diff --git a/src/stmt.rs b/src/stmt.rs new file mode 100644 index 00000000..d432a5d0 --- /dev/null +++ b/src/stmt.rs @@ -0,0 +1,537 @@ +use collections::{Deque, RingBuf}; +use std::cell::Cell; +use std::task; + +use PostgresConnection; +use error::{PgDbError, + PgStreamDesynchronized, + PgStreamError, + PostgresDbError, + PostgresError}; +use message::{Bind, + Close, + CommandComplete, + DataRow, + ErrorResponse, + Execute, + ReadyForQuery, + RowDescriptionEntry, + Sync}; +use types::{FromSql, ToSql, PostgresType}; + +/// A trait containing methods that can be called on a prepared statement. +pub trait PostgresStatement { + /// Returns a slice containing the expected parameter types. + fn param_types<'a>(&'a self) -> &'a [PostgresType]; + + /// Returns a slice describing the columns of the result of the query. + fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription]; + + /// Attempts to execute the prepared statement, returning the number of + /// rows modified. + /// + /// If the statement does not modify any rows (e.g. SELECT), 0 is returned. + /// + /// # Failure + /// + /// Fails if the number or types of the provided parameters do not match + /// the parameters of the statement. + fn try_execute(&self, params: &[&ToSql]) -> Result; + + /// A convenience function wrapping `try_execute`. + /// + /// # Failure + /// + /// Fails if there was an error executing the statement. + fn execute(&self, params: &[&ToSql]) -> uint { + match self.try_execute(params) { + Ok(count) => count, + Err(err) => fail!("Error running query\n{}", err.to_str()) + } + } + + /// Attempts to execute the prepared statement, returning an iterator over + /// the resulting rows. + /// + /// # Failure + /// + /// Fails if the number or types of the provided parameters do not match + /// the parameters of the statement. + fn try_query<'a>(&'a self, params: &[&ToSql]) + -> Result, PostgresError>; + + /// A convenience function wrapping `try_query`. + /// + /// # Failure + /// + /// Fails if there was an error executing the statement. + fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> { + match self.try_query(params) { + Ok(result) => result, + Err(err) => fail!("Error executing query:\n{}", err.to_str()) + } + } + + /// Consumes the statement, clearing it from the Postgres session. + /// + /// Functionally identical to the `Drop` implementation of the + /// `PostgresStatement` except that it returns any error to the caller. + fn finish(self) -> Result<(), PostgresError>; +} + +/// A statement prepared outside of a transaction. +pub struct NormalPostgresStatement<'conn> { + priv conn: &'conn PostgresConnection, + priv name: ~str, + priv param_types: ~[PostgresType], + priv result_desc: ~[ResultDescription], + priv next_portal_id: Cell, + priv finished: Cell, +} + +pub fn make_NormalPostgresStatement<'a>(conn: &'a PostgresConnection, + name: ~str, + param_types: ~[PostgresType], + result_desc: ~[ResultDescription]) + -> NormalPostgresStatement<'a> { + NormalPostgresStatement { + conn: conn, + name: name, + param_types: param_types, + result_desc: result_desc, + next_portal_id: Cell::new(0), + finished: Cell::new(false), + } +} + +#[unsafe_destructor] +impl<'conn> Drop for NormalPostgresStatement<'conn> { + fn drop(&mut self) { + if !self.finished.get() { + match self.finish_inner() { + Ok(()) | Err(PgStreamDesynchronized) => {} + Err(err) => + fail_unless_failing!("Error dropping statement: {}", err) + } + } + } +} + +impl<'conn> NormalPostgresStatement<'conn> { + fn finish_inner(&mut self) -> Result<(), PostgresError> { + check_desync!(self.conn); + if_ok_pg!(self.conn.write_messages([ + Close { + variant: 'S' as u8, + name: self.name.as_slice() + }, + Sync])); + loop { + match if_ok_pg!(self.conn.read_message()) { + ReadyForQuery { .. } => break, + ErrorResponse { fields } => { + if_ok!(self.conn.wait_for_ready()); + return Err(PgDbError(PostgresDbError::new(fields))); + } + _ => {} + } + } + Ok(()) + } + + fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql]) + -> Result<(), PostgresError> { + let mut formats = ~[]; + let mut values = ~[]; + assert!(self.param_types.len() == params.len(), + "Expected {} parameters but found {}", + self.param_types.len(), params.len()); + for (¶m, ty) in params.iter().zip(self.param_types.iter()) { + let (format, value) = param.to_sql(ty); + formats.push(format as i16); + values.push(value); + }; + + let result_formats: ~[i16] = self.result_desc.iter().map(|desc| { + desc.ty.result_format() as i16 + }).collect(); + + if_ok_pg!(self.conn.write_messages([ + Bind { + portal: portal_name, + statement: self.name.as_slice(), + formats: formats, + values: values, + result_formats: result_formats + }, + Execute { + portal: portal_name, + max_rows: row_limit as i32 + }, + Sync])); + + match if_ok_pg!(self.conn.read_message()) { + BindComplete => Ok(()), + ErrorResponse { fields } => { + if_ok!(self.conn.wait_for_ready()); + Err(PgDbError(PostgresDbError::new(fields))) + } + _ => unreachable!() + } + } + + fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) + -> Result, PostgresError> { + let id = self.next_portal_id.get(); + self.next_portal_id.set(id + 1); + let portal_name = format!("{}_portal_{}", self.name, id); + + if_ok!(self.execute(portal_name, row_limit, params)); + + let mut result = PostgresResult { + stmt: self, + name: portal_name, + data: RingBuf::new(), + row_limit: row_limit, + more_rows: true, + finished: false, + }; + if_ok!(result.read_rows()) + + Ok(result) + } +} + +impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> { + fn param_types<'a>(&'a self) -> &'a [PostgresType] { + self.param_types.as_slice() + } + + fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] { + self.result_desc.as_slice() + } + + fn try_execute(&self, params: &[&ToSql]) + -> Result { + check_desync!(self.conn); + if_ok!(self.execute("", 0, params)); + + let num; + loop { + match if_ok_pg!(self.conn.read_message()) { + DataRow { .. } => {} + ErrorResponse { fields } => { + if_ok!(self.conn.wait_for_ready()); + return Err(PgDbError(PostgresDbError::new(fields))); + } + CommandComplete { tag } => { + let s = tag.split(' ').last().unwrap(); + num = match FromStr::from_str(s) { + None => 0, + Some(n) => n + }; + break; + } + EmptyQueryResponse => { + num = 0; + break; + } + _ => unreachable!() + } + } + if_ok!(self.conn.wait_for_ready()); + + Ok(num) + } + + fn try_query<'a>(&'a self, params: &[&ToSql]) + -> Result, PostgresError> { + check_desync!(self.conn); + self.try_lazy_query(0, params) + } + + fn finish(mut self) -> Result<(), PostgresError> { + self.finished.set(true); + self.finish_inner() + } +} + +/// Information about a column of the result of a query. +#[deriving(Eq)] +pub struct ResultDescription { + /// The name of the column + name: ~str, + /// The type of the data in the column + ty: PostgresType +} + +impl ResultDescription { + pub fn from_row_description_entry(row: RowDescriptionEntry) + -> ResultDescription { + let RowDescriptionEntry { name, type_oid, .. } = row; + + ResultDescription { + name: name, + ty: PostgresType::from_oid(type_oid) + } + } +} + +/// A statement prepared inside of a transaction. +/// +/// Provides additional functionality over a `NormalPostgresStatement`. +pub struct TransactionalPostgresStatement<'conn> { + priv stmt: NormalPostgresStatement<'conn> +} + +pub fn make_TransactionalPostgresStatement<'a>(stmt: NormalPostgresStatement<'a>) + -> TransactionalPostgresStatement<'a> { + TransactionalPostgresStatement { + stmt: stmt, + } +} + +impl<'conn> PostgresStatement for TransactionalPostgresStatement<'conn> { + fn param_types<'a>(&'a self) -> &'a [PostgresType] { + self.stmt.param_types() + } + + fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] { + self.stmt.result_descriptions() + } + + fn try_execute(&self, params: &[&ToSql]) -> Result { + self.stmt.try_execute(params) + } + + fn try_query<'a>(&'a self, params: &[&ToSql]) + -> Result, PostgresError> { + self.stmt.try_query(params) + } + + fn finish(self) -> Result<(), PostgresError> { + self.stmt.finish() + } +} + +impl<'conn> TransactionalPostgresStatement<'conn> { + /// Attempts to execute the prepared statement, returning a lazily loaded + /// iterator over the resulting rows. + /// + /// No more than `row_limit` rows will be stored in memory at a time. Rows + /// will be pulled from the database in batches of `row_limit` as needed. + /// If `row_limit` is 0, `try_lazy_query` is equivalent to `try_query`. + /// + /// # Failure + /// + /// Fails if the number or types of the provided parameters do not match + /// the parameters of the statement. + pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) + -> Result, PostgresError> { + check_desync!(self.stmt.conn); + self.stmt.try_lazy_query(row_limit, params) + } + + /// A convenience wrapper around `try_lazy_query`. + /// + /// # Failure + /// + /// Fails if there was an error executing the statement. + pub fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) + -> PostgresResult<'a> { + match self.try_lazy_query(row_limit, params) { + Ok(result) => result, + Err(err) => fail!("Error executing query:\n{}", err) + } + } +} + +/// An iterator over the resulting rows of a query. +pub struct PostgresResult<'stmt> { + priv stmt: &'stmt NormalPostgresStatement<'stmt>, + priv name: ~str, + priv data: RingBuf<~[Option<~[u8]>]>, + priv row_limit: uint, + priv more_rows: bool, + priv finished: bool, +} + +#[unsafe_destructor] +impl<'stmt> Drop for PostgresResult<'stmt> { + fn drop(&mut self) { + if !self.finished { + match self.finish_inner() { + Ok(()) | Err(PgStreamDesynchronized) => {} + Err(err) => + fail_unless_failing!("Error dropping result: {}", err) + } + } + } +} + +impl<'stmt> PostgresResult<'stmt> { + fn finish_inner(&mut self) -> Result<(), PostgresError> { + check_desync!(self.stmt.conn); + if_ok_pg!(self.stmt.conn.write_messages([ + Close { + variant: 'P' as u8, + name: self.name.as_slice() + }, + Sync])); + loop { + match if_ok_pg!(self.stmt.conn.read_message()) { + ReadyForQuery { .. } => break, + ErrorResponse { fields } => { + if_ok!(self.stmt.conn.wait_for_ready()); + return Err(PgDbError(PostgresDbError::new(fields))); + } + _ => {} + } + } + Ok(()) + } + + fn read_rows(&mut self) -> Result<(), PostgresError> { + loop { + match if_ok_pg!(self.stmt.conn.read_message()) { + EmptyQueryResponse | + CommandComplete { .. } => { + self.more_rows = false; + break; + }, + PortalSuspended => { + self.more_rows = true; + break; + }, + DataRow { row } => self.data.push_back(row), + _ => unreachable!() + } + } + self.stmt.conn.wait_for_ready() + } + + fn execute(&mut self) -> Result<(), PostgresError> { + if_ok_pg!(self.stmt.conn.write_messages([ + Execute { + portal: self.name, + max_rows: self.row_limit as i32 + }, + Sync])); + self.read_rows() + } +} + +impl<'stmt> PostgresResult<'stmt> { + /// Consumes the `PostgresResult`, cleaning up associated state. + /// + /// Functionally identical to the `Drop` implementation on + /// `PostgresResult` except that it returns any error to the caller. + pub fn finish(mut self) -> Result<(), PostgresError> { + self.finished = true; + self.finish_inner() + } + + /// Like `PostgresResult::next` except that it returns any errors to the + /// caller instead of failing. + pub fn try_next(&mut self) + -> Result>, PostgresError> { + if self.data.is_empty() && self.more_rows { + if_ok!(self.execute()); + } + + let row = self.data.pop_front().map(|row| { + PostgresRow { + stmt: self.stmt, + data: row + } + }); + Ok(row) + } +} + +impl<'stmt> Iterator> for PostgresResult<'stmt> { + fn next(&mut self) -> Option> { + match self.try_next() { + Ok(ok) => ok, + Err(err) => fail!("Error fetching rows: {}", err) + } + } + + fn size_hint(&self) -> (uint, Option) { + let lower = self.data.len(); + let upper = if self.more_rows { + None + } else { + Some(lower) + }; + (lower, upper) + } +} + +/// A single result row of a query. +/// +/// A value can be accessed by the name or index of its column, though access +/// by index is more efficient. Rows are 1-indexed. +/// +/// ```rust +/// let foo: i32 = row[1]; +/// let bar: ~str = row["bar"]; +/// ``` +pub struct PostgresRow<'stmt> { + priv stmt: &'stmt NormalPostgresStatement<'stmt>, + priv data: ~[Option<~[u8]>] +} + +impl<'stmt> Container for PostgresRow<'stmt> { + #[inline] + fn len(&self) -> uint { + self.data.len() + } +} + +impl<'stmt, I: RowIndex, T: FromSql> Index for PostgresRow<'stmt> { + #[inline] + fn index(&self, idx: &I) -> T { + let idx = idx.idx(self.stmt); + FromSql::from_sql(&self.stmt.result_desc[idx].ty, &self.data[idx]) + } +} + +/// A trait implemented by types that can index into columns of a row. +pub trait RowIndex { + /// Returns the index of the appropriate column. + /// + /// # Failure + /// + /// Fails if there is no corresponding column. + fn idx(&self, stmt: &NormalPostgresStatement) -> uint; +} + +impl RowIndex for uint { + #[inline] + fn idx(&self, _stmt: &NormalPostgresStatement) -> uint { + assert!(*self != 0, "out of bounds row access"); + *self - 1 + } +} + +// This is a convenience as the 1 in get[1] resolves to int :( +impl RowIndex for int { + #[inline] + fn idx(&self, _stmt: &NormalPostgresStatement) -> uint { + assert!(*self >= 1, "out of bounds row access"); + (*self - 1) as uint + } +} + +impl<'a> RowIndex for &'a str { + fn idx(&self, stmt: &NormalPostgresStatement) -> uint { + for (i, desc) in stmt.result_descriptions().iter().enumerate() { + if desc.name.as_slice() == *self { + return i; + } + } + fail!("there is no column with name {}", *self); + } +} +