Restructure lazy query API
It doesn't make any sense to limit lazy queries to statments *prepared* inside a transaction. We really only care that we're in a transaction when the statement is executed. This does introduce a new runtime error if a statement prepared on connection A is executed on a transaction prepared on connection B, but I don't think anyone will *ever* run into that.
This commit is contained in:
parent
98c92c1dca
commit
587cbff836
@ -17,7 +17,7 @@ extern crate time;
|
||||
|
||||
use time::Timespec;
|
||||
|
||||
use postgres::{PostgresConnection, PostgresStatement, NoSsl};
|
||||
use postgres::{PostgresConnection, NoSsl};
|
||||
use postgres::types::ToSql;
|
||||
|
||||
struct Person {
|
||||
|
@ -506,6 +506,8 @@ pub enum PostgresError {
|
||||
/// The communication channel with the Postgres server has desynchronized
|
||||
/// due to an earlier communications error.
|
||||
PgStreamDesynchronized,
|
||||
/// A prepared statement was executed on a connection it does not belong to
|
||||
PgWrongConnection,
|
||||
}
|
||||
|
||||
impl PostgresError {
|
||||
@ -516,7 +518,11 @@ impl PostgresError {
|
||||
PgStreamError(ref err) => format!("{}", *err),
|
||||
PgStreamDesynchronized =>
|
||||
~"The communication stream with the Postgres server has \
|
||||
become desynchronized due to an earlier communications error"
|
||||
become desynchronized due to an earlier communications \
|
||||
error",
|
||||
PgWrongConnection =>
|
||||
~"A statement was executed on a connection it was not \
|
||||
prepared on "
|
||||
}
|
||||
}
|
||||
}
|
||||
|
577
src/lib.rs
577
src/lib.rs
@ -9,7 +9,7 @@ extern crate time;
|
||||
|
||||
use time::Timespec;
|
||||
|
||||
use postgres::{PostgresConnection, PostgresStatement, NoSsl};
|
||||
use postgres::{PostgresConnection, NoSsl};
|
||||
use postgres::types::ToSql;
|
||||
|
||||
struct Person {
|
||||
@ -106,7 +106,8 @@ use error::{DnsError,
|
||||
PostgresError,
|
||||
SocketError,
|
||||
SslError,
|
||||
UnsupportedAuthentication};
|
||||
UnsupportedAuthentication,
|
||||
PgWrongConnection};
|
||||
use message::{AuthenticationCleartextPassword,
|
||||
AuthenticationGSS,
|
||||
AuthenticationKerberosV5,
|
||||
@ -117,6 +118,7 @@ use message::{AuthenticationCleartextPassword,
|
||||
BackendKeyData,
|
||||
BackendMessage,
|
||||
BindComplete,
|
||||
CommandComplete,
|
||||
DataRow,
|
||||
EmptyQueryResponse,
|
||||
ErrorResponse,
|
||||
@ -128,9 +130,13 @@ use message::{AuthenticationCleartextPassword,
|
||||
ParseComplete,
|
||||
PortalSuspended,
|
||||
ReadyForQuery,
|
||||
RowDescription};
|
||||
use message::{CancelRequest,
|
||||
RowDescription,
|
||||
RowDescriptionEntry};
|
||||
use message::{Bind,
|
||||
CancelRequest,
|
||||
Close,
|
||||
Describe,
|
||||
Execute,
|
||||
FrontendMessage,
|
||||
Parse,
|
||||
PasswordMessage,
|
||||
@ -140,15 +146,7 @@ use message::{CancelRequest,
|
||||
Sync,
|
||||
Terminate};
|
||||
use message::{WriteMessage, ReadMessage};
|
||||
use types::{Oid, PostgresType, ToSql, PgUnknownType};
|
||||
|
||||
pub use stmt::{NormalPostgresStatement,
|
||||
PostgresResult,
|
||||
PostgresRow,
|
||||
PostgresStatement,
|
||||
ResultDescription,
|
||||
RowIndex,
|
||||
TransactionalPostgresStatement};
|
||||
use types::{Oid, PostgresType, ToSql, FromSql, PgUnknownType};
|
||||
|
||||
macro_rules! try_pg_conn(
|
||||
($e:expr) => (
|
||||
@ -199,7 +197,6 @@ macro_rules! fail_unless_failing(
|
||||
pub mod error;
|
||||
pub mod pool;
|
||||
mod message;
|
||||
mod stmt;
|
||||
pub mod types;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
@ -558,7 +555,7 @@ impl InnerPostgresConnection {
|
||||
}
|
||||
|
||||
fn try_prepare<'a>(&mut self, query: &str, conn: &'a PostgresConnection)
|
||||
-> Result<NormalPostgresStatement<'a>, PostgresError> {
|
||||
-> Result<PostgresStatement<'a>, PostgresError> {
|
||||
let stmt_name = format!("statement_{}", self.next_stmt_id);
|
||||
self.next_stmt_id += 1;
|
||||
|
||||
@ -593,7 +590,11 @@ impl InnerPostgresConnection {
|
||||
let mut result_desc: Vec<ResultDescription> = match try_pg!(self.read_message()) {
|
||||
RowDescription { descriptions } =>
|
||||
descriptions.move_iter().map(|desc| {
|
||||
stmt::make_ResultDescription(desc)
|
||||
let RowDescriptionEntry { name, type_oid, .. } = desc;
|
||||
ResultDescription {
|
||||
name: name,
|
||||
ty: PostgresType::from_oid(type_oid)
|
||||
}
|
||||
}).collect(),
|
||||
NoData => Vec::new(),
|
||||
_ => unreachable!()
|
||||
@ -624,10 +625,14 @@ impl InnerPostgresConnection {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(stmt::make_NormalPostgresStatement(conn,
|
||||
stmt_name,
|
||||
param_types,
|
||||
result_desc))
|
||||
Ok(PostgresStatement {
|
||||
conn: conn,
|
||||
name: stmt_name,
|
||||
param_types: param_types,
|
||||
result_desc: result_desc,
|
||||
next_portal_id: Cell::new(0),
|
||||
finished: Cell::new(false),
|
||||
})
|
||||
}
|
||||
|
||||
fn is_desynchronized(&self) -> bool {
|
||||
@ -767,7 +772,7 @@ impl PostgresConnection {
|
||||
/// Err(err) => fail!("Error preparing statement: {}", err)
|
||||
/// };
|
||||
pub fn try_prepare<'a>(&'a self, query: &str)
|
||||
-> Result<NormalPostgresStatement<'a>, PostgresError> {
|
||||
-> Result<PostgresStatement<'a>, PostgresError> {
|
||||
self.conn.borrow_mut().try_prepare(query, self)
|
||||
}
|
||||
|
||||
@ -776,7 +781,7 @@ impl PostgresConnection {
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error preparing the statement.
|
||||
pub fn prepare<'a>(&'a self, query: &str) -> NormalPostgresStatement<'a> {
|
||||
pub fn prepare<'a>(&'a self, query: &str) -> PostgresStatement<'a> {
|
||||
match self.try_prepare(query) {
|
||||
Ok(stmt) => stmt,
|
||||
Err(err) => fail!("Error preparing statement:\n{}",
|
||||
@ -957,16 +962,13 @@ impl<'conn> PostgresTransaction<'conn> {
|
||||
impl<'conn> PostgresTransaction<'conn> {
|
||||
/// Like `PostgresConnection::try_prepare`.
|
||||
pub fn try_prepare<'a>(&'a self, query: &str)
|
||||
-> Result<TransactionalPostgresStatement<'a>, PostgresError> {
|
||||
self.conn.try_prepare(query).map(|stmt| {
|
||||
stmt::make_TransactionalPostgresStatement(stmt)
|
||||
})
|
||||
-> Result<PostgresStatement<'a>, PostgresError> {
|
||||
self.conn.try_prepare(query)
|
||||
}
|
||||
|
||||
/// Like `PostgresConnection::prepare`.
|
||||
pub fn prepare<'a>(&'a self, query: &str)
|
||||
-> TransactionalPostgresStatement<'a> {
|
||||
stmt::make_TransactionalPostgresStatement(self.conn.prepare(query))
|
||||
pub fn prepare<'a>(&'a self, query: &str) -> PostgresStatement<'a> {
|
||||
self.conn.prepare(query)
|
||||
}
|
||||
|
||||
/// Like `PostgresConnection::try_execute`.
|
||||
@ -1034,5 +1036,522 @@ impl<'conn> PostgresTransaction<'conn> {
|
||||
self.finished = true;
|
||||
self.finish_inner()
|
||||
}
|
||||
|
||||
/// Attempts to execute a prepared statement, returning a lazily loaded
|
||||
/// iterator over the resulting rows.
|
||||
///
|
||||
/// No more than `row_limit` rows will be stored in memory at a time. Rows
|
||||
/// will be pulled from the database in batches of `row_limit` as needed.
|
||||
/// If `row_limit` is 0, `try_lazy_query` is equivalent to `try_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
pub fn try_lazy_query<'trans, 'stmt>(&'trans self,
|
||||
stmt: &'stmt PostgresStatement,
|
||||
params: &[&ToSql],
|
||||
row_limit: uint)
|
||||
-> Result<PostgresLazyResult<'trans,
|
||||
'stmt>,
|
||||
PostgresError> {
|
||||
if self.conn as *PostgresConnection != stmt.conn as *PostgresConnection {
|
||||
return Err(PgWrongConnection);
|
||||
}
|
||||
check_desync!(self.conn);
|
||||
stmt.try_lazy_query(row_limit, params).map(|result| {
|
||||
PostgresLazyResult {
|
||||
trans: self,
|
||||
result: result
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// A convenience wrapper around `try_lazy_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
pub fn lazy_query<'trans, 'stmt>(&'trans self,
|
||||
stmt: &'stmt PostgresStatement,
|
||||
params: &[&ToSql],
|
||||
row_limit: uint)
|
||||
-> PostgresLazyResult<'trans, 'stmt> {
|
||||
match self.try_lazy_query(stmt, params, row_limit) {
|
||||
Ok(result) => result,
|
||||
Err(err) => fail!("Error executing query:\n{}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A prepared statement
|
||||
pub struct PostgresStatement<'conn> {
|
||||
priv conn: &'conn PostgresConnection,
|
||||
priv name: ~str,
|
||||
priv param_types: Vec<PostgresType>,
|
||||
priv result_desc: Vec<ResultDescription>,
|
||||
priv next_portal_id: Cell<uint>,
|
||||
priv finished: Cell<bool>,
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'conn> Drop for PostgresStatement<'conn> {
|
||||
fn drop(&mut self) {
|
||||
if !self.finished.get() {
|
||||
match self.finish_inner() {
|
||||
Ok(()) | Err(PgStreamDesynchronized) => {}
|
||||
Err(err) =>
|
||||
fail_unless_failing!("Error dropping statement: {}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'conn> PostgresStatement<'conn> {
|
||||
fn finish_inner(&mut self) -> Result<(), PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
try_pg!(self.conn.write_messages([
|
||||
Close {
|
||||
variant: 'S' as u8,
|
||||
name: self.name.as_slice()
|
||||
},
|
||||
Sync]));
|
||||
loop {
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
ReadyForQuery { .. } => break,
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn inner_execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
|
||||
-> Result<(), PostgresError> {
|
||||
let mut formats = Vec::new();
|
||||
let mut values = Vec::new();
|
||||
assert!(self.param_types.len() == params.len(),
|
||||
"Expected {} parameters but found {}",
|
||||
self.param_types.len(), params.len());
|
||||
for (¶m, ty) in params.iter().zip(self.param_types.iter()) {
|
||||
let (format, value) = param.to_sql(ty);
|
||||
formats.push(format as i16);
|
||||
values.push(value);
|
||||
};
|
||||
|
||||
let result_formats: Vec<i16> = self.result_desc.iter().map(|desc| {
|
||||
desc.ty.result_format() as i16
|
||||
}).collect();
|
||||
|
||||
try_pg!(self.conn.write_messages([
|
||||
Bind {
|
||||
portal: portal_name,
|
||||
statement: self.name.as_slice(),
|
||||
formats: formats.as_slice(),
|
||||
values: values.as_slice(),
|
||||
result_formats: result_formats.as_slice()
|
||||
},
|
||||
Execute {
|
||||
portal: portal_name,
|
||||
max_rows: row_limit as i32
|
||||
},
|
||||
Sync]));
|
||||
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
BindComplete => Ok(()),
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
Err(PgDbError(PostgresDbError::new(fields)))
|
||||
}
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
let id = self.next_portal_id.get();
|
||||
self.next_portal_id.set(id + 1);
|
||||
let portal_name = format!("{}_portal_{}", self.name, id);
|
||||
|
||||
try!(self.inner_execute(portal_name, row_limit, params));
|
||||
|
||||
let mut result = PostgresResult {
|
||||
stmt: self,
|
||||
name: portal_name,
|
||||
data: RingBuf::new(),
|
||||
row_limit: row_limit,
|
||||
more_rows: true,
|
||||
finished: false,
|
||||
};
|
||||
try!(result.read_rows())
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Returns a slice containing the expected parameter types.
|
||||
pub fn param_types<'a>(&'a self) -> &'a [PostgresType] {
|
||||
self.param_types.as_slice()
|
||||
}
|
||||
|
||||
/// Returns a slice describing the columns of the result of the query.
|
||||
pub fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
|
||||
self.result_desc.as_slice()
|
||||
}
|
||||
|
||||
/// Attempts to execute the prepared statement, returning the number of
|
||||
/// rows modified.
|
||||
///
|
||||
/// If the statement does not modify any rows (e.g. SELECT), 0 is returned.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, NoSsl};
|
||||
/// # use postgres::types::ToSql;
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// # let bar = 1i32;
|
||||
/// # let baz = true;
|
||||
/// let stmt = conn.prepare("UPDATE foo SET bar = $1 WHERE baz = $2");
|
||||
/// match stmt.try_execute([&bar as &ToSql, &baz as &ToSql]) {
|
||||
/// Ok(count) => println!("{} row(s) updated", count),
|
||||
/// Err(err) => println!("Error executing query: {}", err)
|
||||
/// }
|
||||
pub fn try_execute(&self, params: &[&ToSql])
|
||||
-> Result<uint, PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
try!(self.inner_execute("", 0, params));
|
||||
|
||||
let num;
|
||||
loop {
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
DataRow { .. } => {}
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
CommandComplete { tag } => {
|
||||
let s = tag.split(' ').last().unwrap();
|
||||
num = match FromStr::from_str(s) {
|
||||
None => 0,
|
||||
Some(n) => n
|
||||
};
|
||||
break;
|
||||
}
|
||||
EmptyQueryResponse => {
|
||||
num = 0;
|
||||
break;
|
||||
}
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
try!(self.conn.wait_for_ready());
|
||||
|
||||
Ok(num)
|
||||
}
|
||||
|
||||
/// A convenience function wrapping `try_execute`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
pub fn execute(&self, params: &[&ToSql]) -> uint {
|
||||
match self.try_execute(params) {
|
||||
Ok(count) => count,
|
||||
Err(err) => fail!("Error running query\n{}", err)
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to execute the prepared statement, returning an iterator over
|
||||
/// the resulting rows.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, NoSsl};
|
||||
/// # use postgres::types::ToSql;
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// let stmt = conn.prepare("SELECT foo FROM bar WHERE baz = $1");
|
||||
/// # let baz = true;
|
||||
/// let mut rows = match stmt.try_query([&baz as &ToSql]) {
|
||||
/// Ok(rows) => rows,
|
||||
/// Err(err) => fail!("Error running query: {}", err)
|
||||
/// };
|
||||
/// for row in rows {
|
||||
/// let foo: i32 = row["foo"];
|
||||
/// println!("foo: {}", foo);
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_query<'a>(&'a self, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
self.try_lazy_query(0, params)
|
||||
}
|
||||
|
||||
/// A convenience function wrapping `try_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
pub fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> {
|
||||
match self.try_query(params) {
|
||||
Ok(result) => result,
|
||||
Err(err) => fail!("Error executing query:\n{}", err)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the statement, clearing it from the Postgres session.
|
||||
///
|
||||
/// Functionally identical to the `Drop` implementation of the
|
||||
/// `PostgresStatement` except that it returns any error to the caller.
|
||||
pub fn finish(mut self) -> Result<(), PostgresError> {
|
||||
self.finished.set(true);
|
||||
self.finish_inner()
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a column of the result of a query.
|
||||
#[deriving(Eq)]
|
||||
pub struct ResultDescription {
|
||||
/// The name of the column
|
||||
name: ~str,
|
||||
/// The type of the data in the column
|
||||
ty: PostgresType
|
||||
}
|
||||
|
||||
/// An iterator over the resulting rows of a query.
|
||||
pub struct PostgresResult<'stmt> {
|
||||
priv stmt: &'stmt PostgresStatement<'stmt>,
|
||||
priv name: ~str,
|
||||
priv data: RingBuf<Vec<Option<~[u8]>>>,
|
||||
priv row_limit: uint,
|
||||
priv more_rows: bool,
|
||||
priv finished: bool,
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'stmt> Drop for PostgresResult<'stmt> {
|
||||
fn drop(&mut self) {
|
||||
if !self.finished {
|
||||
match self.finish_inner() {
|
||||
Ok(()) | Err(PgStreamDesynchronized) => {}
|
||||
Err(err) =>
|
||||
fail_unless_failing!("Error dropping result: {}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> PostgresResult<'stmt> {
|
||||
fn finish_inner(&mut self) -> Result<(), PostgresError> {
|
||||
check_desync!(self.stmt.conn);
|
||||
try_pg!(self.stmt.conn.write_messages([
|
||||
Close {
|
||||
variant: 'P' as u8,
|
||||
name: self.name.as_slice()
|
||||
},
|
||||
Sync]));
|
||||
loop {
|
||||
match try_pg!(self.stmt.conn.read_message()) {
|
||||
ReadyForQuery { .. } => break,
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.stmt.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_rows(&mut self) -> Result<(), PostgresError> {
|
||||
loop {
|
||||
match try_pg!(self.stmt.conn.read_message()) {
|
||||
EmptyQueryResponse |
|
||||
CommandComplete { .. } => {
|
||||
self.more_rows = false;
|
||||
break;
|
||||
},
|
||||
PortalSuspended => {
|
||||
self.more_rows = true;
|
||||
break;
|
||||
},
|
||||
DataRow { row } => self.data.push_back(row),
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
self.stmt.conn.wait_for_ready()
|
||||
}
|
||||
|
||||
fn execute(&mut self) -> Result<(), PostgresError> {
|
||||
try_pg!(self.stmt.conn.write_messages([
|
||||
Execute {
|
||||
portal: self.name,
|
||||
max_rows: self.row_limit as i32
|
||||
},
|
||||
Sync]));
|
||||
self.read_rows()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> PostgresResult<'stmt> {
|
||||
/// Consumes the `PostgresResult`, cleaning up associated state.
|
||||
///
|
||||
/// Functionally identical to the `Drop` implementation on
|
||||
/// `PostgresResult` except that it returns any error to the caller.
|
||||
pub fn finish(mut self) -> Result<(), PostgresError> {
|
||||
self.finished = true;
|
||||
self.finish_inner()
|
||||
}
|
||||
|
||||
/// Like `PostgresResult::next` except that it returns any errors to the
|
||||
/// caller instead of failing.
|
||||
pub fn try_next(&mut self) -> Result<Option<PostgresRow<'stmt>>,
|
||||
PostgresError> {
|
||||
if self.data.is_empty() && self.more_rows {
|
||||
try!(self.execute());
|
||||
}
|
||||
|
||||
let row = self.data.pop_front().map(|row| {
|
||||
PostgresRow {
|
||||
stmt: self.stmt,
|
||||
data: row
|
||||
}
|
||||
});
|
||||
Ok(row)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresResult<'stmt> {
|
||||
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
|
||||
match self.try_next() {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => fail!("Error fetching rows: {}", err)
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (uint, Option<uint>) {
|
||||
let lower = self.data.len();
|
||||
let upper = if self.more_rows {
|
||||
None
|
||||
} else {
|
||||
Some(lower)
|
||||
};
|
||||
(lower, upper)
|
||||
}
|
||||
}
|
||||
|
||||
/// A single result row of a query.
|
||||
///
|
||||
/// A value can be accessed by the name or index of its column, though access
|
||||
/// by index is more efficient. Rows are 1-indexed.
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, NoSsl};
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// # let stmt = conn.prepare("");
|
||||
/// # let mut result = stmt.query([]);
|
||||
/// # let row = result.next().unwrap();
|
||||
/// let foo: i32 = row[1];
|
||||
/// let bar: ~str = row["bar"];
|
||||
/// ```
|
||||
pub struct PostgresRow<'stmt> {
|
||||
priv stmt: &'stmt PostgresStatement<'stmt>,
|
||||
priv data: Vec<Option<~[u8]>>
|
||||
}
|
||||
|
||||
impl<'stmt> Container for PostgresRow<'stmt> {
|
||||
#[inline]
|
||||
fn len(&self) -> uint {
|
||||
self.data.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt, I: RowIndex, T: FromSql> Index<I, T> for PostgresRow<'stmt> {
|
||||
fn index(&self, idx: &I) -> T {
|
||||
let idx = idx.idx(self.stmt);
|
||||
FromSql::from_sql(&self.stmt.result_desc.get(idx).ty, self.data.get(idx))
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait implemented by types that can index into columns of a row.
|
||||
pub trait RowIndex {
|
||||
/// Returns the index of the appropriate column.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there is no corresponding column.
|
||||
fn idx(&self, stmt: &PostgresStatement) -> uint;
|
||||
}
|
||||
|
||||
impl RowIndex for uint {
|
||||
#[inline]
|
||||
fn idx(&self, _stmt: &PostgresStatement) -> uint {
|
||||
assert!(*self != 0, "out of bounds row access");
|
||||
*self - 1
|
||||
}
|
||||
}
|
||||
|
||||
// This is a convenience as the 1 in get[1] resolves to int :(
|
||||
impl RowIndex for int {
|
||||
#[inline]
|
||||
fn idx(&self, _stmt: &PostgresStatement) -> uint {
|
||||
assert!(*self >= 1, "out of bounds row access");
|
||||
(*self - 1) as uint
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> RowIndex for &'a str {
|
||||
fn idx(&self, stmt: &PostgresStatement) -> uint {
|
||||
for (i, desc) in stmt.result_descriptions().iter().enumerate() {
|
||||
if desc.name.as_slice() == *self {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
fail!("there is no column with name {}", *self);
|
||||
}
|
||||
}
|
||||
|
||||
/// A lazily-loaded iterator over the resulting rows of a query
|
||||
///
|
||||
/// This is a smart pointer around a `PostgresResult`.
|
||||
pub struct PostgresLazyResult<'trans, 'stmt> {
|
||||
priv result: PostgresResult<'stmt>,
|
||||
priv trans: &'trans PostgresTransaction<'trans>,
|
||||
}
|
||||
|
||||
impl<'trans, 'stmt> PostgresLazyResult<'trans, 'stmt> {
|
||||
/// Like `PostgresResult::finish`.
|
||||
pub fn finish(self) -> Result<(), PostgresError> {
|
||||
self.result.finish()
|
||||
}
|
||||
|
||||
/// Like `PostgresResult::try_next`.
|
||||
pub fn try_next(&mut self) -> Result<Option<PostgresRow<'stmt>>,
|
||||
PostgresError> {
|
||||
self.result.try_next()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'trans, 'stmt> Iterator<PostgresRow<'stmt>>
|
||||
for PostgresLazyResult<'trans, 'stmt> {
|
||||
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
|
||||
self.result.next()
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (uint, Option<uint>) {
|
||||
self.result.size_hint()
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ use sync::{Arc, Mutex};
|
||||
use {PostgresNotifications,
|
||||
PostgresCancelData,
|
||||
PostgresConnection,
|
||||
NormalPostgresStatement,
|
||||
PostgresStatement,
|
||||
PostgresTransaction,
|
||||
SslMode};
|
||||
use error::{PostgresConnectError, PostgresError};
|
||||
@ -143,12 +143,12 @@ impl Drop for PooledPostgresConnection {
|
||||
impl PooledPostgresConnection {
|
||||
/// Like `PostgresConnection::try_prepare`.
|
||||
pub fn try_prepare<'a>(&'a self, query: &str)
|
||||
-> Result<NormalPostgresStatement<'a>, PostgresError> {
|
||||
-> Result<PostgresStatement<'a>, PostgresError> {
|
||||
self.conn.get_ref().try_prepare(query)
|
||||
}
|
||||
|
||||
/// Like `PostgresConnection::prepare`.
|
||||
pub fn prepare<'a>(&'a self, query: &str) -> NormalPostgresStatement<'a> {
|
||||
pub fn prepare<'a>(&'a self, query: &str) -> PostgresStatement<'a> {
|
||||
self.conn.get_ref().prepare(query)
|
||||
}
|
||||
|
||||
|
571
src/stmt.rs
571
src/stmt.rs
@ -1,571 +0,0 @@
|
||||
use collections::{Deque, RingBuf};
|
||||
use std::cell::Cell;
|
||||
use std::from_str::FromStr;
|
||||
use std::task;
|
||||
|
||||
use PostgresConnection;
|
||||
use error::{PgDbError,
|
||||
PgStreamDesynchronized,
|
||||
PgStreamError,
|
||||
PostgresDbError,
|
||||
PostgresError};
|
||||
use message::{Bind,
|
||||
Close,
|
||||
CommandComplete,
|
||||
DataRow,
|
||||
ErrorResponse,
|
||||
Execute,
|
||||
ReadyForQuery,
|
||||
RowDescriptionEntry,
|
||||
Sync};
|
||||
use types::{FromSql, ToSql, PostgresType};
|
||||
|
||||
/// A trait containing methods that can be called on a prepared statement.
|
||||
pub trait PostgresStatement {
|
||||
/// Returns a slice containing the expected parameter types.
|
||||
fn param_types<'a>(&'a self) -> &'a [PostgresType];
|
||||
|
||||
/// Returns a slice describing the columns of the result of the query.
|
||||
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription];
|
||||
|
||||
/// Attempts to execute the prepared statement, returning the number of
|
||||
/// rows modified.
|
||||
///
|
||||
/// If the statement does not modify any rows (e.g. SELECT), 0 is returned.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, NoSsl, PostgresStatement};
|
||||
/// # use postgres::types::ToSql;
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// # let bar = 1i32;
|
||||
/// # let baz = true;
|
||||
/// let stmt = conn.prepare("UPDATE foo SET bar = $1 WHERE baz = $2");
|
||||
/// match stmt.try_execute([&bar as &ToSql, &baz as &ToSql]) {
|
||||
/// Ok(count) => println!("{} row(s) updated", count),
|
||||
/// Err(err) => println!("Error executing query: {}", err)
|
||||
/// }
|
||||
fn try_execute(&self, params: &[&ToSql]) -> Result<uint, PostgresError>;
|
||||
|
||||
/// A convenience function wrapping `try_execute`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
fn execute(&self, params: &[&ToSql]) -> uint {
|
||||
match self.try_execute(params) {
|
||||
Ok(count) => count,
|
||||
Err(err) => fail!("Error running query\n{}", err)
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to execute the prepared statement, returning an iterator over
|
||||
/// the resulting rows.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, NoSsl, PostgresStatement};
|
||||
/// # use postgres::types::ToSql;
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// let stmt = conn.prepare("SELECT foo FROM bar WHERE baz = $1");
|
||||
/// # let baz = true;
|
||||
/// let mut rows = match stmt.try_query([&baz as &ToSql]) {
|
||||
/// Ok(rows) => rows,
|
||||
/// Err(err) => fail!("Error running query: {}", err)
|
||||
/// };
|
||||
/// for row in rows {
|
||||
/// let foo: i32 = row["foo"];
|
||||
/// println!("foo: {}", foo);
|
||||
/// }
|
||||
/// ```
|
||||
fn try_query<'a>(&'a self, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError>;
|
||||
|
||||
/// A convenience function wrapping `try_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
fn query<'a>(&'a self, params: &[&ToSql]) -> PostgresResult<'a> {
|
||||
match self.try_query(params) {
|
||||
Ok(result) => result,
|
||||
Err(err) => fail!("Error executing query:\n{}", err)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the statement, clearing it from the Postgres session.
|
||||
///
|
||||
/// Functionally identical to the `Drop` implementation of the
|
||||
/// `PostgresStatement` except that it returns any error to the caller.
|
||||
fn finish(self) -> Result<(), PostgresError>;
|
||||
}
|
||||
|
||||
/// A statement prepared outside of a transaction.
|
||||
pub struct NormalPostgresStatement<'conn> {
|
||||
priv conn: &'conn PostgresConnection,
|
||||
priv name: ~str,
|
||||
priv param_types: Vec<PostgresType>,
|
||||
priv result_desc: Vec<ResultDescription>,
|
||||
priv next_portal_id: Cell<uint>,
|
||||
priv finished: Cell<bool>,
|
||||
}
|
||||
|
||||
pub fn make_NormalPostgresStatement<'a>(conn: &'a PostgresConnection,
|
||||
name: ~str,
|
||||
param_types: Vec<PostgresType>,
|
||||
result_desc: Vec<ResultDescription>)
|
||||
-> NormalPostgresStatement<'a> {
|
||||
NormalPostgresStatement {
|
||||
conn: conn,
|
||||
name: name,
|
||||
param_types: param_types,
|
||||
result_desc: result_desc,
|
||||
next_portal_id: Cell::new(0),
|
||||
finished: Cell::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'conn> Drop for NormalPostgresStatement<'conn> {
|
||||
fn drop(&mut self) {
|
||||
if !self.finished.get() {
|
||||
match self.finish_inner() {
|
||||
Ok(()) | Err(PgStreamDesynchronized) => {}
|
||||
Err(err) =>
|
||||
fail_unless_failing!("Error dropping statement: {}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'conn> NormalPostgresStatement<'conn> {
|
||||
fn finish_inner(&mut self) -> Result<(), PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
try_pg!(self.conn.write_messages([
|
||||
Close {
|
||||
variant: 'S' as u8,
|
||||
name: self.name.as_slice()
|
||||
},
|
||||
Sync]));
|
||||
loop {
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
ReadyForQuery { .. } => break,
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
|
||||
-> Result<(), PostgresError> {
|
||||
let mut formats = Vec::new();
|
||||
let mut values = Vec::new();
|
||||
assert!(self.param_types.len() == params.len(),
|
||||
"Expected {} parameters but found {}",
|
||||
self.param_types.len(), params.len());
|
||||
for (¶m, ty) in params.iter().zip(self.param_types.iter()) {
|
||||
let (format, value) = param.to_sql(ty);
|
||||
formats.push(format as i16);
|
||||
values.push(value);
|
||||
};
|
||||
|
||||
let result_formats: Vec<i16> = self.result_desc.iter().map(|desc| {
|
||||
desc.ty.result_format() as i16
|
||||
}).collect();
|
||||
|
||||
try_pg!(self.conn.write_messages([
|
||||
Bind {
|
||||
portal: portal_name,
|
||||
statement: self.name.as_slice(),
|
||||
formats: formats.as_slice(),
|
||||
values: values.as_slice(),
|
||||
result_formats: result_formats.as_slice()
|
||||
},
|
||||
Execute {
|
||||
portal: portal_name,
|
||||
max_rows: row_limit as i32
|
||||
},
|
||||
Sync]));
|
||||
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
BindComplete => Ok(()),
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
Err(PgDbError(PostgresDbError::new(fields)))
|
||||
}
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
let id = self.next_portal_id.get();
|
||||
self.next_portal_id.set(id + 1);
|
||||
let portal_name = format!("{}_portal_{}", self.name, id);
|
||||
|
||||
try!(self.execute(portal_name, row_limit, params));
|
||||
|
||||
let mut result = PostgresResult {
|
||||
stmt: self,
|
||||
name: portal_name,
|
||||
data: RingBuf::new(),
|
||||
row_limit: row_limit,
|
||||
more_rows: true,
|
||||
finished: false,
|
||||
};
|
||||
try!(result.read_rows())
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
|
||||
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
|
||||
self.param_types.as_slice()
|
||||
}
|
||||
|
||||
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
|
||||
self.result_desc.as_slice()
|
||||
}
|
||||
|
||||
fn try_execute(&self, params: &[&ToSql])
|
||||
-> Result<uint, PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
try!(self.execute("", 0, params));
|
||||
|
||||
let num;
|
||||
loop {
|
||||
match try_pg!(self.conn.read_message()) {
|
||||
DataRow { .. } => {}
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
CommandComplete { tag } => {
|
||||
let s = tag.split(' ').last().unwrap();
|
||||
num = match FromStr::from_str(s) {
|
||||
None => 0,
|
||||
Some(n) => n
|
||||
};
|
||||
break;
|
||||
}
|
||||
EmptyQueryResponse => {
|
||||
num = 0;
|
||||
break;
|
||||
}
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
try!(self.conn.wait_for_ready());
|
||||
|
||||
Ok(num)
|
||||
}
|
||||
|
||||
fn try_query<'a>(&'a self, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
check_desync!(self.conn);
|
||||
self.try_lazy_query(0, params)
|
||||
}
|
||||
|
||||
fn finish(mut self) -> Result<(), PostgresError> {
|
||||
self.finished.set(true);
|
||||
self.finish_inner()
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a column of the result of a query.
|
||||
#[deriving(Eq)]
|
||||
pub struct ResultDescription {
|
||||
/// The name of the column
|
||||
name: ~str,
|
||||
/// The type of the data in the column
|
||||
ty: PostgresType
|
||||
}
|
||||
|
||||
pub fn make_ResultDescription(row: RowDescriptionEntry) -> ResultDescription {
|
||||
let RowDescriptionEntry { name, type_oid, .. } = row;
|
||||
|
||||
ResultDescription {
|
||||
name: name,
|
||||
ty: PostgresType::from_oid(type_oid)
|
||||
}
|
||||
}
|
||||
|
||||
/// A statement prepared inside of a transaction.
|
||||
///
|
||||
/// Provides additional functionality over a `NormalPostgresStatement`.
|
||||
pub struct TransactionalPostgresStatement<'conn> {
|
||||
priv stmt: NormalPostgresStatement<'conn>
|
||||
}
|
||||
|
||||
pub fn make_TransactionalPostgresStatement<'a>(stmt: NormalPostgresStatement<'a>)
|
||||
-> TransactionalPostgresStatement<'a> {
|
||||
TransactionalPostgresStatement {
|
||||
stmt: stmt,
|
||||
}
|
||||
}
|
||||
|
||||
impl<'conn> PostgresStatement for TransactionalPostgresStatement<'conn> {
|
||||
fn param_types<'a>(&'a self) -> &'a [PostgresType] {
|
||||
self.stmt.param_types()
|
||||
}
|
||||
|
||||
fn result_descriptions<'a>(&'a self) -> &'a [ResultDescription] {
|
||||
self.stmt.result_descriptions()
|
||||
}
|
||||
|
||||
fn try_execute(&self, params: &[&ToSql]) -> Result<uint, PostgresError> {
|
||||
self.stmt.try_execute(params)
|
||||
}
|
||||
|
||||
fn try_query<'a>(&'a self, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
self.stmt.try_query(params)
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<(), PostgresError> {
|
||||
self.stmt.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'conn> TransactionalPostgresStatement<'conn> {
|
||||
/// Attempts to execute the prepared statement, returning a lazily loaded
|
||||
/// iterator over the resulting rows.
|
||||
///
|
||||
/// No more than `row_limit` rows will be stored in memory at a time. Rows
|
||||
/// will be pulled from the database in batches of `row_limit` as needed.
|
||||
/// If `row_limit` is 0, `try_lazy_query` is equivalent to `try_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if the number or types of the provided parameters do not match
|
||||
/// the parameters of the statement.
|
||||
pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
|
||||
-> Result<PostgresResult<'a>, PostgresError> {
|
||||
check_desync!(self.stmt.conn);
|
||||
self.stmt.try_lazy_query(row_limit, params)
|
||||
}
|
||||
|
||||
/// A convenience wrapper around `try_lazy_query`.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there was an error executing the statement.
|
||||
pub fn lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
|
||||
-> PostgresResult<'a> {
|
||||
match self.try_lazy_query(row_limit, params) {
|
||||
Ok(result) => result,
|
||||
Err(err) => fail!("Error executing query:\n{}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over the resulting rows of a query.
|
||||
pub struct PostgresResult<'stmt> {
|
||||
priv stmt: &'stmt NormalPostgresStatement<'stmt>,
|
||||
priv name: ~str,
|
||||
priv data: RingBuf<Vec<Option<~[u8]>>>,
|
||||
priv row_limit: uint,
|
||||
priv more_rows: bool,
|
||||
priv finished: bool,
|
||||
}
|
||||
|
||||
#[unsafe_destructor]
|
||||
impl<'stmt> Drop for PostgresResult<'stmt> {
|
||||
fn drop(&mut self) {
|
||||
if !self.finished {
|
||||
match self.finish_inner() {
|
||||
Ok(()) | Err(PgStreamDesynchronized) => {}
|
||||
Err(err) =>
|
||||
fail_unless_failing!("Error dropping result: {}", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> PostgresResult<'stmt> {
|
||||
fn finish_inner(&mut self) -> Result<(), PostgresError> {
|
||||
check_desync!(self.stmt.conn);
|
||||
try_pg!(self.stmt.conn.write_messages([
|
||||
Close {
|
||||
variant: 'P' as u8,
|
||||
name: self.name.as_slice()
|
||||
},
|
||||
Sync]));
|
||||
loop {
|
||||
match try_pg!(self.stmt.conn.read_message()) {
|
||||
ReadyForQuery { .. } => break,
|
||||
ErrorResponse { fields } => {
|
||||
try!(self.stmt.conn.wait_for_ready());
|
||||
return Err(PgDbError(PostgresDbError::new(fields)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_rows(&mut self) -> Result<(), PostgresError> {
|
||||
loop {
|
||||
match try_pg!(self.stmt.conn.read_message()) {
|
||||
EmptyQueryResponse |
|
||||
CommandComplete { .. } => {
|
||||
self.more_rows = false;
|
||||
break;
|
||||
},
|
||||
PortalSuspended => {
|
||||
self.more_rows = true;
|
||||
break;
|
||||
},
|
||||
DataRow { row } => self.data.push_back(row),
|
||||
_ => unreachable!()
|
||||
}
|
||||
}
|
||||
self.stmt.conn.wait_for_ready()
|
||||
}
|
||||
|
||||
fn execute(&mut self) -> Result<(), PostgresError> {
|
||||
try_pg!(self.stmt.conn.write_messages([
|
||||
Execute {
|
||||
portal: self.name,
|
||||
max_rows: self.row_limit as i32
|
||||
},
|
||||
Sync]));
|
||||
self.read_rows()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> PostgresResult<'stmt> {
|
||||
/// Consumes the `PostgresResult`, cleaning up associated state.
|
||||
///
|
||||
/// Functionally identical to the `Drop` implementation on
|
||||
/// `PostgresResult` except that it returns any error to the caller.
|
||||
pub fn finish(mut self) -> Result<(), PostgresError> {
|
||||
self.finished = true;
|
||||
self.finish_inner()
|
||||
}
|
||||
|
||||
/// Like `PostgresResult::next` except that it returns any errors to the
|
||||
/// caller instead of failing.
|
||||
pub fn try_next(&mut self) -> Result<Option<PostgresRow<'stmt>>,
|
||||
PostgresError> {
|
||||
if self.data.is_empty() && self.more_rows {
|
||||
try!(self.execute());
|
||||
}
|
||||
|
||||
let row = self.data.pop_front().map(|row| {
|
||||
PostgresRow {
|
||||
stmt: self.stmt,
|
||||
data: row
|
||||
}
|
||||
});
|
||||
Ok(row)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresResult<'stmt> {
|
||||
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
|
||||
match self.try_next() {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => fail!("Error fetching rows: {}", err)
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (uint, Option<uint>) {
|
||||
let lower = self.data.len();
|
||||
let upper = if self.more_rows {
|
||||
None
|
||||
} else {
|
||||
Some(lower)
|
||||
};
|
||||
(lower, upper)
|
||||
}
|
||||
}
|
||||
|
||||
/// A single result row of a query.
|
||||
///
|
||||
/// A value can be accessed by the name or index of its column, though access
|
||||
/// by index is more efficient. Rows are 1-indexed.
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use postgres::{PostgresConnection, PostgresStatement, NoSsl};
|
||||
/// # let conn = PostgresConnection::connect("", &NoSsl);
|
||||
/// # let stmt = conn.prepare("");
|
||||
/// # let mut result = stmt.query([]);
|
||||
/// # let row = result.next().unwrap();
|
||||
/// let foo: i32 = row[1];
|
||||
/// let bar: ~str = row["bar"];
|
||||
/// ```
|
||||
pub struct PostgresRow<'stmt> {
|
||||
priv stmt: &'stmt NormalPostgresStatement<'stmt>,
|
||||
priv data: Vec<Option<~[u8]>>
|
||||
}
|
||||
|
||||
impl<'stmt> Container for PostgresRow<'stmt> {
|
||||
#[inline]
|
||||
fn len(&self) -> uint {
|
||||
self.data.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'stmt, I: RowIndex, T: FromSql> Index<I, T> for PostgresRow<'stmt> {
|
||||
fn index(&self, idx: &I) -> T {
|
||||
let idx = idx.idx(self.stmt);
|
||||
FromSql::from_sql(&self.stmt.result_desc.get(idx).ty, self.data.get(idx))
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait implemented by types that can index into columns of a row.
|
||||
pub trait RowIndex {
|
||||
/// Returns the index of the appropriate column.
|
||||
///
|
||||
/// # Failure
|
||||
///
|
||||
/// Fails if there is no corresponding column.
|
||||
fn idx(&self, stmt: &NormalPostgresStatement) -> uint;
|
||||
}
|
||||
|
||||
impl RowIndex for uint {
|
||||
#[inline]
|
||||
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
|
||||
assert!(*self != 0, "out of bounds row access");
|
||||
*self - 1
|
||||
}
|
||||
}
|
||||
|
||||
// This is a convenience as the 1 in get[1] resolves to int :(
|
||||
impl RowIndex for int {
|
||||
#[inline]
|
||||
fn idx(&self, _stmt: &NormalPostgresStatement) -> uint {
|
||||
assert!(*self >= 1, "out of bounds row access");
|
||||
(*self - 1) as uint
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> RowIndex for &'a str {
|
||||
fn idx(&self, stmt: &NormalPostgresStatement) -> uint {
|
||||
for (i, desc) in stmt.result_descriptions().iter().enumerate() {
|
||||
if desc.name.as_slice() == *self {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
fail!("there is no column with name {}", *self);
|
||||
}
|
||||
}
|
||||
|
37
src/test.rs
37
src/test.rs
@ -13,13 +13,13 @@ use std::io::timer;
|
||||
use {PostgresNoticeHandler,
|
||||
PostgresNotification,
|
||||
PostgresConnection,
|
||||
PostgresStatement,
|
||||
ResultDescription,
|
||||
RequireSsl,
|
||||
PreferSsl,
|
||||
NoSsl};
|
||||
use error::{PgConnectDbError,
|
||||
PgDbError,
|
||||
PgWrongConnection,
|
||||
DnsError,
|
||||
MissingPassword,
|
||||
Position,
|
||||
@ -293,20 +293,31 @@ fn test_result_finish() {
|
||||
fn test_lazy_query() {
|
||||
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
|
||||
|
||||
{
|
||||
let trans = conn.transaction();
|
||||
trans.execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)", []);
|
||||
let stmt = trans.prepare("INSERT INTO foo (id) VALUES ($1)");
|
||||
let values = ~[0i32, 1, 2, 3, 4, 5];
|
||||
for value in values.iter() {
|
||||
stmt.execute([value as &ToSql]);
|
||||
}
|
||||
let trans = conn.transaction();
|
||||
trans.execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)", []);
|
||||
let stmt = trans.prepare("INSERT INTO foo (id) VALUES ($1)");
|
||||
let values = ~[0i32, 1, 2, 3, 4, 5];
|
||||
for value in values.iter() {
|
||||
stmt.execute([value as &ToSql]);
|
||||
}
|
||||
let stmt = conn.prepare("SELECT id FROM foo ORDER BY id");
|
||||
let result = trans.lazy_query(&stmt, [], 2);
|
||||
assert_eq!(values, result.map(|row| row[1]).collect());
|
||||
|
||||
let stmt = trans.prepare("SELECT id FROM foo ORDER BY id");
|
||||
let result = stmt.lazy_query(2, []);
|
||||
assert_eq!(values, result.map(|row| row[1]).collect());
|
||||
trans.set_rollback();
|
||||
}
|
||||
|
||||
trans.set_rollback();
|
||||
#[test]
|
||||
fn test_lazy_query_wrong_conn() {
|
||||
let conn1 = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
|
||||
let conn2 = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
|
||||
|
||||
let trans = conn1.transaction();
|
||||
let stmt = conn2.prepare("SELECT 1::INT");
|
||||
match trans.try_lazy_query(&stmt, [], 1) {
|
||||
Err(PgWrongConnection) => {}
|
||||
Err(err) => fail!("Unexpected error {}", err),
|
||||
Ok(_) => fail!("Expected failure")
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user