Remove lazy query support

It turns out this only works inside a transaction, so the API will need
to be rethought. Probably PostgresTransaction.prepare will return a
special TransactionalPreparedStatement that has support for it or
something.
This commit is contained in:
Steven Fackler 2013-09-02 23:09:30 -07:00
parent 03e0ab29e2
commit a677f62fe2
3 changed files with 36 additions and 130 deletions

View File

@ -4,11 +4,9 @@
extern mod extra;
use extra::container::Deque;
use extra::digest::Digest;
use extra::md5::Md5;
use extra::url::{UserInfo, Url};
use extra::ringbuf::RingBuf;
use std::cell::Cell;
use std::hashmap::HashMap;
use std::rt::io::{io_error, Decorator};
@ -305,7 +303,6 @@ impl PostgresConnection {
name: stmt_name,
param_types: param_types,
result_desc: result_desc,
next_portal_id: Cell::new(0)
})
}
@ -409,7 +406,6 @@ pub struct PostgresStatement<'self> {
priv name: ~str,
priv param_types: ~[Oid],
priv result_desc: ~[RowDescriptionEntry],
priv next_portal_id: Cell<uint>
}
#[unsafe_destructor]
@ -437,7 +433,7 @@ impl<'self> PostgresStatement<'self> {
self.param_types.len()
}
fn bind(&self, portal_name: &str, params: &[&ToSql])
fn execute(&self, portal_name: &str, params: &[&ToSql])
-> Option<PostgresDbError> {
let mut formats = ~[];
let mut values = ~[];
@ -459,17 +455,20 @@ impl<'self> PostgresStatement<'self> {
values: values,
result_formats: result_formats
},
&Execute {
portal: portal_name,
max_rows: 0
},
&Sync]);
let ret = match_read_message!(self.conn, {
match_read_message!(self.conn, {
BindComplete => None,
ErrorResponse { fields } => Some(PostgresDbError::new(fields)),
ErrorResponse { fields } => {
self.conn.wait_for_ready();
Some(PostgresDbError::new(fields))
},
resp => fail!("Bad response: %?", resp.to_str())
});
self.conn.wait_for_ready();
ret
})
}
pub fn update(&self, params: &[&ToSql]) -> uint {
@ -481,21 +480,13 @@ impl<'self> PostgresStatement<'self> {
pub fn try_update(&self, params: &[&ToSql])
-> Result<uint, PostgresDbError> {
// The unnamed portal is automatically cleaned up at sync time
match self.bind("", params) {
match self.execute("", params) {
Some(err) => {
return Err(err);
}
None => ()
}
self.conn.write_messages([
&Execute {
portal: &"",
max_rows: 0
},
&Sync]);
let num;
loop {
match_read_message!(self.conn, {
@ -525,49 +516,42 @@ impl<'self> PostgresStatement<'self> {
Ok(num)
}
pub fn query(&'self self, params: &[&ToSql]) -> PostgresResult<'self> {
self.lazy_query(0, params)
}
pub fn try_query(&'self self, params: &[&ToSql])
-> Result<PostgresResult<'self>, PostgresDbError> {
self.try_lazy_query(0, params)
}
pub fn lazy_query(&'self self, row_limit: uint, params: &[&ToSql])
pub fn query(&'self self, params: &[&ToSql])
-> PostgresResult<'self> {
match self.try_lazy_query(row_limit, params) {
match self.try_query(params) {
Ok(result) => result,
Err(err) => fail!("Error running query: %s", err.to_str())
}
}
pub fn try_lazy_query(&'self self, row_limit: uint, params: &[&ToSql])
pub fn try_query(&'self self, params: &[&ToSql])
-> Result<PostgresResult<'self>, PostgresDbError> {
let id = self.next_portal_id.take();
let portal_name = format!("{}_portal_{}", self.name.as_slice(), id);
self.next_portal_id.put_back(id + 1);
match self.bind(portal_name, params) {
match self.execute("", params) {
Some(err) => {
self.conn.wait_for_ready();
return Err(err);
}
None => ()
}
let mut result = PostgresResult {
stmt: self,
name: portal_name,
data: RingBuf::new(),
more_rows: true,
max_rows: row_limit
};
// We have to make sure to execute the result at least once since it
// may have side effects (e.g. INSERT ... RETURNING ...)
result.execute();
let mut data = ~[];
loop {
match_read_message!(self.conn, {
EmptyQueryResponse |
CommandComplete {_} => {
break;
},
DataRow { row } => data.push(row),
resp => fail!("Bad response: %?", resp.to_str())
})
}
self.conn.wait_for_ready();
Ok(result)
// we're going to be popping off
data.reverse();
Ok(PostgresResult {
stmt: self,
data: data,
})
}
pub fn find_col_named(&self, col: &str) -> Option<uint> {
@ -579,39 +563,12 @@ impl<'self> PostgresStatement<'self> {
pub struct PostgresResult<'self> {
priv stmt: &'self PostgresStatement<'self>,
priv name: ~str,
priv data: RingBuf<~[Option<~[u8]>]>,
priv max_rows: uint,
priv more_rows: bool
}
#[unsafe_destructor]
impl<'self> Drop for PostgresResult<'self> {
fn drop(&self) {
do io_error::cond.trap(|_| {}).inside {
self.stmt.conn.write_messages([
&Close {
variant: 'P' as u8,
name: self.name.as_slice()
},
&Sync]);
loop {
match_read_message!(self.stmt.conn, {
ReadyForQuery {_} => break,
_ => ()
})
}
}
}
priv data: ~[~[Option<~[u8]>]]
}
impl<'self> Iterator<PostgresRow<'self>> for PostgresResult<'self> {
fn next(&mut self) -> Option<PostgresRow<'self>> {
if self.data.is_empty() && self.more_rows {
self.execute();
}
do self.data.pop_front().map_move |row| {
do self.data.pop_opt().map_move |row| {
PostgresRow {
stmt: self.stmt,
data: row
@ -620,34 +577,6 @@ impl<'self> Iterator<PostgresRow<'self>> for PostgresResult<'self> {
}
}
impl<'self> PostgresResult<'self> {
fn execute(&mut self) {
self.stmt.conn.write_messages([
&Execute {
portal: self.name,
max_rows: self.max_rows as i32
},
&Sync]);
loop {
match_read_message!(self.stmt.conn, {
EmptyQueryResponse |
CommandComplete {_} => {
self.more_rows = false;
break;
},
DataRow { row } => self.data.push_back(row),
PortalSuspended => {
self.more_rows = true;
break;
},
resp => fail!("Bad response: %?", resp.to_str())
})
}
self.stmt.conn.wait_for_ready();
}
}
pub struct PostgresRow<'self> {
priv stmt: &'self PostgresStatement<'self>,
priv data: ~[Option<~[u8]>]

View File

@ -45,7 +45,6 @@ pub enum BackendMessage {
value: ~str
},
ParseComplete,
PortalSuspended,
ReadyForQuery {
state: u8
},
@ -262,7 +261,6 @@ impl<R: Reader> ReadMessage for R {
'n' => NoData,
'N' => NoticeResponse { fields: read_fields(&mut buf) },
'R' => read_auth_message(&mut buf),
's' => PortalSuspended,
'S' => ParameterStatus {
parameter: buf.read_string(),
value: buf.read_string()

View File

@ -74,27 +74,6 @@ fn test_nulls() {
}
}
#[test]
fn test_lazy_query() {
do test_in_transaction |trans| {
trans.update("CREATE TABLE foo (
id SERIAL PRIMARY KEY,
val BIGINT
)", []);
let stmt = trans.prepare("INSERT INTO foo (val) VALUES ($1)");
let data = ~[1i64, 2, 3, 4, 5, 6];
for datum in data.iter() {
stmt.update([datum as &ToSql]);
}
let stmt = trans.prepare("SELECT val FROM foo ORDER BY id");
let result = stmt.lazy_query(2, []);
assert_eq!(data,
result.map(|row| { row[0] }).collect());
}
}
fn test_type<T: Eq+ToSql+FromSql>(sql_type: &str, values: &[T]) {
do test_in_transaction |trans| {
trans.update("CREATE TABLE foo (