Properly handle object cleanup errors

Destructors now fail and there's a finish method returning an error.
This commit is contained in:
Steven Fackler 2014-02-09 23:55:16 -08:00
parent dec566e683
commit 6e37db330b
2 changed files with 281 additions and 41 deletions

View File

@ -189,6 +189,14 @@ macro_rules! check_desync(
) )
) )
macro_rules! fail_unless_failing(
($($t:tt)*) => (
if !task::failing() {
fail!($($t)*)
}
)
)
static DEFAULT_PORT: Port = 5432; static DEFAULT_PORT: Port = 5432;
/// Trait for types that can handle Postgres notice messages /// Trait for types that can handle Postgres notice messages
@ -363,11 +371,18 @@ struct InnerPostgresConnection {
cancel_data: PostgresCancelData, cancel_data: PostgresCancelData,
unknown_types: HashMap<Oid, ~str>, unknown_types: HashMap<Oid, ~str>,
desynchronized: bool, desynchronized: bool,
finished: bool,
} }
impl Drop for InnerPostgresConnection { impl Drop for InnerPostgresConnection {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.write_messages([Terminate]); if !self.finished {
match self.finish_inner() {
Ok(()) => {}
Err(err) =>
fail_unless_failing!("Error dropping connection: {}", err)
}
}
} }
} }
@ -409,6 +424,7 @@ impl InnerPostgresConnection {
cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 }, cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 },
unknown_types: HashMap::new(), unknown_types: HashMap::new(),
desynchronized: false, desynchronized: false,
finished: false,
}; };
args.push((~"client_encoding", ~"UTF8")); args.push((~"client_encoding", ~"UTF8"));
@ -598,10 +614,15 @@ impl InnerPostgresConnection {
name: stmt_name, name: stmt_name,
param_types: param_types, param_types: param_types,
result_desc: result_desc, result_desc: result_desc,
next_portal_id: Cell::new(0) next_portal_id: Cell::new(0),
finished: Cell::new(false),
}) })
} }
fn is_desynchronized(&self) -> bool {
self.desynchronized
}
fn get_type_name(&mut self, oid: Oid) -> Result<~str, PostgresError> { fn get_type_name(&mut self, oid: Oid) -> Result<~str, PostgresError> {
match self.unknown_types.find(&oid) { match self.unknown_types.find(&oid) {
Some(name) => return Ok(name.clone()), Some(name) => return Ok(name.clone()),
@ -623,6 +644,7 @@ impl InnerPostgresConnection {
fn quick_query(&mut self, query: &str) fn quick_query(&mut self, query: &str)
-> Result<~[~[Option<~str>]], PostgresError> { -> Result<~[~[Option<~str>]], PostgresError> {
check_desync!(self);
if_ok_pg!(self.write_messages([Query { query: query }])); if_ok_pg!(self.write_messages([Query { query: query }]));
let mut result = ~[]; let mut result = ~[];
@ -640,6 +662,11 @@ impl InnerPostgresConnection {
} }
Ok(result) Ok(result)
} }
fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self);
Ok(if_ok_pg!(self.write_messages([Terminate])))
}
} }
/// A connection to a Postgres database. /// A connection to a Postgres database.
@ -736,7 +763,8 @@ impl PostgresConnection {
Ok(PostgresTransaction { Ok(PostgresTransaction {
conn: self, conn: self,
commit: Cell::new(true), commit: Cell::new(true),
nested: false nested: false,
finished: false,
}) })
} }
@ -790,10 +818,23 @@ impl PostgresConnection {
/// If this has occurred, all further queries will immediately return an /// If this has occurred, all further queries will immediately return an
/// error. /// error.
pub fn is_desynchronized(&self) -> bool { pub fn is_desynchronized(&self) -> bool {
self.conn.with(|conn| conn.desynchronized) self.conn.with(|conn| conn.is_desynchronized())
} }
fn quick_query(&self, query: &str) -> Result<~[~[Option<~str>]], PostgresError> { /// Consumes the connection, closing it.
///
/// Functionally equivalent to the `Drop` implementation for
/// `PostgresConnection` except that it returns any error encountered to
/// the caller.
pub fn finish(self) -> Result<(), PostgresError> {
self.conn.with_mut(|conn| {
conn.finished = true;
conn.finish_inner()
})
}
fn quick_query(&self, query: &str)
-> Result<~[~[Option<~str>]], PostgresError> {
self.conn.with_mut(|conn| conn.quick_query(query)) self.conn.with_mut(|conn| conn.quick_query(query))
} }
@ -824,25 +865,39 @@ pub enum SslMode {
pub struct PostgresTransaction<'conn> { pub struct PostgresTransaction<'conn> {
priv conn: &'conn PostgresConnection, priv conn: &'conn PostgresConnection,
priv commit: Cell<bool>, priv commit: Cell<bool>,
priv nested: bool priv nested: bool,
priv finished: bool,
} }
#[unsafe_destructor] #[unsafe_destructor]
impl<'conn> Drop for PostgresTransaction<'conn> { impl<'conn> Drop for PostgresTransaction<'conn> {
fn drop(&mut self) { fn drop(&mut self) {
if !self.finished {
match self.finish_inner() {
Ok(()) => {}
Err(err) =>
fail_unless_failing!("Error dropping transaction: {}", err)
}
}
}
}
impl<'conn> PostgresTransaction<'conn> {
fn finish_inner(&mut self) -> Result<(), PostgresError> {
if task::failing() || !self.commit.get() { if task::failing() || !self.commit.get() {
if self.nested { if self.nested {
self.conn.quick_query("ROLLBACK TO sp"); if_ok!(self.conn.quick_query("ROLLBACK TO sp"));
} else { } else {
self.conn.quick_query("ROLLBACK"); if_ok!(self.conn.quick_query("ROLLBACK"));
} }
} else { } else {
if self.nested { if self.nested {
self.conn.quick_query("RELEASE sp"); if_ok!(self.conn.quick_query("RELEASE sp"));
} else { } else {
self.conn.quick_query("COMMIT"); if_ok!(self.conn.quick_query("COMMIT"));
} }
} }
Ok(())
} }
} }
@ -884,7 +939,8 @@ impl<'conn> PostgresTransaction<'conn> {
Ok(PostgresTransaction { Ok(PostgresTransaction {
conn: self.conn, conn: self.conn,
commit: Cell::new(true), commit: Cell::new(true),
nested: true nested: true,
finished: false,
}) })
} }
@ -920,6 +976,15 @@ impl<'conn> PostgresTransaction<'conn> {
pub fn set_rollback(&self) { pub fn set_rollback(&self) {
self.commit.set(false); self.commit.set(false);
} }
/// Consumes the transaction, commiting or rolling it back as appropriate.
///
/// Functionally equivalent to the `Drop` implementation of
/// `PostgresTransaction` except that it returns any error to the caller.
pub fn finish(mut self) -> Result<(), PostgresError> {
self.finished = true;
self.finish_inner()
}
} }
/// A trait containing methods that can be called on a prepared statement. /// A trait containing methods that can be called on a prepared statement.
@ -974,6 +1039,12 @@ pub trait PostgresStatement {
Err(err) => fail!("Error executing query:\n{}", err.to_str()) Err(err) => fail!("Error executing query:\n{}", err.to_str())
} }
} }
/// Consumes the statement, clearing it from the Postgres session.
///
/// Functionally identical to the `Drop` implementation of the
/// `PostgresStatement` except that it returns any error to the caller.
fn finish(self) -> Result<(), PostgresError>;
} }
/// A statement prepared outside of a transaction. /// A statement prepared outside of a transaction.
@ -982,29 +1053,42 @@ pub struct NormalPostgresStatement<'conn> {
priv name: ~str, priv name: ~str,
priv param_types: ~[PostgresType], priv param_types: ~[PostgresType],
priv result_desc: ~[ResultDescription], priv result_desc: ~[ResultDescription],
priv next_portal_id: Cell<uint> priv next_portal_id: Cell<uint>,
priv finished: Cell<bool>,
} }
#[unsafe_destructor] #[unsafe_destructor]
impl<'conn> Drop for NormalPostgresStatement<'conn> { impl<'conn> Drop for NormalPostgresStatement<'conn> {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.conn.write_messages([ if !self.finished.get() {
Close { match self.finish_inner() {
variant: 'S' as u8, Ok(()) => {}
name: self.name.as_slice() Err(err) =>
}, fail_unless_failing!("Error dropping statement: {}", err)
Sync]);
loop {
match self.conn.read_message() {
Ok(ReadyForQuery { .. }) => break,
Err(_) => break,
_ => {}
} }
} }
} }
} }
impl<'conn> NormalPostgresStatement<'conn> { impl<'conn> NormalPostgresStatement<'conn> {
fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self.conn);
if_ok_pg!(self.conn.write_messages([
Close {
variant: 'S' as u8,
name: self.name.as_slice()
},
Sync]));
loop {
// TODO forward db errors
match if_ok_pg!(self.conn.read_message()) {
ReadyForQuery { .. } => break,
_ => {}
}
}
Ok(())
}
fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql]) fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
-> Result<(), PostgresError> { -> Result<(), PostgresError> {
let mut formats = ~[]; let mut formats = ~[];
@ -1059,9 +1143,10 @@ impl<'conn> NormalPostgresStatement<'conn> {
name: portal_name, name: portal_name,
data: RingBuf::new(), data: RingBuf::new(),
row_limit: row_limit, row_limit: row_limit,
more_rows: true more_rows: true,
finished: false,
}; };
result.read_rows(); if_ok!(result.read_rows())
Ok(result) Ok(result)
} }
@ -1114,6 +1199,11 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
check_desync!(self.conn); check_desync!(self.conn);
self.try_lazy_query(0, params) self.try_lazy_query(0, params)
} }
fn finish(mut self) -> Result<(), PostgresError> {
self.finished.set(true);
self.finish_inner()
}
} }
/// Information about a column of the result of a query. /// Information about a column of the result of a query.
@ -1161,6 +1251,10 @@ impl<'conn> PostgresStatement for TransactionalPostgresStatement<'conn> {
-> Result<PostgresResult<'a>, PostgresError> { -> Result<PostgresResult<'a>, PostgresError> {
self.stmt.try_query(params) self.stmt.try_query(params)
} }
fn finish(self) -> Result<(), PostgresError> {
self.stmt.finish()
}
} }
impl<'conn> TransactionalPostgresStatement<'conn> { impl<'conn> TransactionalPostgresStatement<'conn> {
@ -1201,28 +1295,42 @@ pub struct PostgresResult<'stmt> {
priv name: ~str, priv name: ~str,
priv data: RingBuf<~[Option<~[u8]>]>, priv data: RingBuf<~[Option<~[u8]>]>,
priv row_limit: uint, priv row_limit: uint,
priv more_rows: bool priv more_rows: bool,
priv finished: bool,
} }
#[unsafe_destructor] #[unsafe_destructor]
impl<'stmt> Drop for PostgresResult<'stmt> { impl<'stmt> Drop for PostgresResult<'stmt> {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.stmt.conn.write_messages([ if !self.finished {
Close { match self.finish_inner() {
variant: 'P' as u8, Ok(()) => {}
name: self.name.as_slice() Err(err) =>
}, fail_unless_failing!("Error dropping result: {}", err)
Sync]);
loop {
match self.stmt.conn.read_message() {
Ok(ReadyForQuery { .. }) => break,
_ => {}
} }
} }
} }
} }
impl<'stmt> PostgresResult<'stmt> { impl<'stmt> PostgresResult<'stmt> {
fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self.stmt.conn);
if_ok_pg!(self.stmt.conn.write_messages([
Close {
variant: 'P' as u8,
name: self.name.as_slice()
},
Sync]));
loop {
// TODO forward PG errors
match if_ok_pg!(self.stmt.conn.read_message()) {
ReadyForQuery { .. } => break,
_ => {}
}
}
Ok(())
}
fn read_rows(&mut self) -> Result<(), PostgresError> { fn read_rows(&mut self) -> Result<(), PostgresError> {
loop { loop {
match if_ok_pg!(self.stmt.conn.read_message()) { match if_ok_pg!(self.stmt.conn.read_message()) {
@ -1253,18 +1361,40 @@ impl<'stmt> PostgresResult<'stmt> {
} }
} }
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresResult<'stmt> { impl<'stmt> PostgresResult<'stmt> {
fn next(&mut self) -> Option<PostgresRow<'stmt>> { /// Consumes the `PostgresResult`, cleaning up associated state.
if self.data.is_empty() && self.more_rows { ///
self.execute(); /// Functionally identical to the `Drop` implementation on
/// `PostgresResult` except that it returns any error to the caller.
pub fn finish(mut self) -> Result<(), PostgresError> {
self.finished = true;
self.finish_inner()
} }
self.data.pop_front().map(|row| { /// Like `PostgresResult::next` except that it returns any errors to the
/// caller instead of failing.
pub fn try_next(&mut self)
-> Result<Option<PostgresRow<'stmt>>, PostgresError> {
if self.data.is_empty() && self.more_rows {
if_ok!(self.execute());
}
let row = self.data.pop_front().map(|row| {
PostgresRow { PostgresRow {
stmt: self.stmt, stmt: self.stmt,
data: row data: row
} }
}) });
Ok(row)
}
}
impl<'stmt> Iterator<PostgresRow<'stmt>> for PostgresResult<'stmt> {
fn next(&mut self) -> Option<PostgresRow<'stmt>> {
match self.try_next() {
Ok(ok) => ok,
Err(err) => fail!("Error fetching rows: {}", err)
}
} }
fn size_hint(&self) -> (uint, Option<uint>) { fn size_hint(&self) -> (uint, Option<uint>) {
@ -1344,3 +1474,4 @@ impl<'a> RowIndex for &'a str {
fail!("there is no column with name {}", *self); fail!("there is no column with name {}", *self);
} }
} }

View File

@ -88,6 +88,12 @@ fn test_unknown_database() {
} }
} }
#[test]
fn test_connection_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
assert!(conn.finish().is_ok());
}
#[test] #[test]
fn test_transaction_commit() { fn test_transaction_commit() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl); let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
@ -103,6 +109,21 @@ fn test_transaction_commit() {
assert_eq!(~[1i32], result.map(|row| row[1]).collect()); assert_eq!(~[1i32], result.map(|row| row[1]).collect());
} }
#[test]
fn test_transaction_commit_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
conn.execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)", []);
let trans = conn.transaction();
trans.execute("INSERT INTO foo (id) VALUES ($1)", [&1i32 as &ToSql]);
assert!(trans.finish().is_ok());
let stmt = conn.prepare("SELECT * FROM foo");
let result = stmt.query([]);
assert_eq!(~[1i32], result.map(|row| row[1]).collect());
}
#[test] #[test]
fn test_transaction_rollback() { fn test_transaction_rollback() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl); let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
@ -121,6 +142,24 @@ fn test_transaction_rollback() {
assert_eq!(~[1i32], result.map(|row| row[1]).collect()); assert_eq!(~[1i32], result.map(|row| row[1]).collect());
} }
#[test]
fn test_transaction_rollback_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
conn.execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)", []);
conn.execute("INSERT INTO foo (id) VALUES ($1)", [&1i32 as &ToSql]);
let trans = conn.transaction();
trans.execute("INSERT INTO foo (id) VALUES ($1)", [&2i32 as &ToSql]);
trans.set_rollback();
assert!(trans.finish().is_ok());
let stmt = conn.prepare("SELECT * FROM foo");
let result = stmt.query([]);
assert_eq!(~[1i32], result.map(|row| row[1]).collect());
}
#[test] #[test]
fn test_nested_transactions() { fn test_nested_transactions() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl); let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
@ -168,6 +207,67 @@ fn test_nested_transactions() {
assert_eq!(~[1i32], result.map(|row| row[1]).collect()); assert_eq!(~[1i32], result.map(|row| row[1]).collect());
} }
#[test]
fn test_nested_transactions_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
conn.execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)", []);
conn.execute("INSERT INTO foo (id) VALUES (1)", []);
{
let trans1 = conn.transaction();
trans1.execute("INSERT INTO foo (id) VALUES (2)", []);
{
let trans2 = trans1.transaction();
trans2.execute("INSERT INTO foo (id) VALUES (3)", []);
trans2.set_rollback();
assert!(trans2.finish().is_ok());
}
{
let trans2 = trans1.transaction();
trans2.execute("INSERT INTO foo (id) VALUES (4)", []);
{
let trans3 = trans2.transaction();
trans3.execute("INSERT INTO foo (id) VALUES (5)", []);
trans3.set_rollback();
assert!(trans3.finish().is_ok());
}
{
let trans3 = trans2.transaction();
trans3.execute("INSERT INTO foo (id) VALUES (6)", []);
assert!(trans3.finish().is_ok());
}
assert!(trans2.finish().is_ok());
}
let stmt = conn.prepare("SELECT * FROM foo ORDER BY id");
let result = stmt.query([]);
assert_eq!(~[1i32, 2, 4, 6], result.map(|row| row[1]).collect());
trans1.set_rollback();
assert!(trans1.finish().is_ok());
}
let stmt = conn.prepare("SELECT * FROM foo ORDER BY id");
let result = stmt.query([]);
assert_eq!(~[1i32], result.map(|row| row[1]).collect());
}
#[test]
fn test_stmt_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
conn.execute("CREATE TEMPORARY TABLE foo (id BIGINT PRIMARY KEY)", []);
let stmt = conn.prepare("SELECT * FROM foo");
assert!(stmt.finish().is_ok());
}
#[test] #[test]
fn test_query() { fn test_query() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl); let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
@ -180,6 +280,15 @@ fn test_query() {
assert_eq!(~[1i64, 2], result.map(|row| row[1]).collect()); assert_eq!(~[1i64, 2], result.map(|row| row[1]).collect());
} }
#[test]
fn test_result_finish() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);
conn.execute("CREATE TEMPORARY TABLE foo (id BIGINT PRIMARY KEY)", []);
let stmt = conn.prepare("SELECT * FROM foo");
let result = stmt.query([]);
assert!(result.finish().is_ok());
}
#[test] #[test]
fn test_lazy_query() { fn test_lazy_query() {
let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl); let conn = PostgresConnection::connect("postgres://postgres@localhost", &NoSsl);