diff --git a/src/error.rs b/src/error.rs index 406fedbf..8256ba86 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,7 +9,7 @@ use phf::PhfMap; macro_rules! make_errors( ($($code:expr => $error:ident),+) => ( /// SQLSTATE error codes - #[deriving(ToStr, Eq, Clone)] + #[deriving(ToStr, Eq, Clone, Show)] #[allow(missing_doc)] pub enum PostgresSqlState { $($error,)+ @@ -378,7 +378,7 @@ pub enum PostgresConnectError { } /// Represents the position of an error in a query -#[deriving(ToStr)] +#[deriving(ToStr, Show)] pub enum PostgresErrorPosition { /// A position in the original query Position(uint), @@ -392,7 +392,7 @@ pub enum PostgresErrorPosition { } /// Encapsulates a Postgres error or notice. -#[deriving(ToStr)] +#[deriving(ToStr, Show)] pub struct PostgresDbError { /// The field contents are ERROR, FATAL, or PANIC (in an error message), /// or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message), or a @@ -496,12 +496,15 @@ impl PostgresDbError { } /// An error encountered when communicating with the Postgres server -#[deriving(ToStr)] +#[deriving(ToStr, Show)] pub enum PostgresError { /// An error reported by the Postgres server PgDbError(PostgresDbError), /// An error communicating with the Postgres server PgStreamError(IoError), + /// The communication channel with the Postgres server has desynchronized + /// due to an earlier communications error. + PgStreamDesynchronized, } impl PostgresError { @@ -510,6 +513,9 @@ impl PostgresError { match *self { PgDbError(ref err) => err.pretty_error(query), PgStreamError(ref err) => format!("{}", *err), + PgStreamDesynchronized => + ~"The communication stream with the Postgres server has \ + become desynchronized due to an earlier communications error" } } } diff --git a/src/lib.rs b/src/lib.rs index b9516858..9a415942 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,7 +103,8 @@ use error::{PostgresDbError, MissingPassword, PostgresError, PgStreamError, - PgDbError}; + PgDbError, + PgStreamDesynchronized}; use message::{BackendMessage, AuthenticationOk, AuthenticationKerberosV5, @@ -168,6 +169,26 @@ macro_rules! if_ok_pg( ) ) +macro_rules! if_ok_desync( + ($e:expr) => ( + match $e { + Ok(ok) => ok, + Err(err) => { + self.desynchronized = true; + return Err(err); + } + } + ) +) + +macro_rules! check_desync( + ($e:expr) => ( + if $e.is_desynchronized() { + return Err(PgStreamDesynchronized); + } + ) +) + static DEFAULT_PORT: Port = 5432; /// Trait for types that can handle Postgres notice messages @@ -341,6 +362,7 @@ struct InnerPostgresConnection { notifications: RingBuf, cancel_data: PostgresCancelData, unknown_types: HashMap, + desynchronized: bool, } impl Drop for InnerPostgresConnection { @@ -386,6 +408,7 @@ impl InnerPostgresConnection { notifications: RingBuf::new(), cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 }, unknown_types: HashMap::new(), + desynchronized: false, }; args.push((~"client_encoding", ~"UTF8")); @@ -426,26 +449,28 @@ impl InnerPostgresConnection { } fn write_messages(&mut self, messages: &[FrontendMessage]) -> IoResult<()> { + assert!(!self.desynchronized); for message in messages.iter() { - if_ok!(self.stream.write_message(message)); + if_ok_desync!(self.stream.write_message(message)); } - self.stream.flush() + Ok(if_ok_desync!(self.stream.flush())) } fn read_message(&mut self) -> IoResult { + assert!(!self.desynchronized); loop { - match self.stream.read_message() { - Ok(NoticeResponse { fields }) => + match if_ok_desync!(self.stream.read_message()) { + NoticeResponse { fields } => self.notice_handler.handle(PostgresDbError::new(fields)), - Ok(NotificationResponse { pid, channel, payload }) => + NotificationResponse { pid, channel, payload } => self.notifications.push_back(PostgresNotification { pid: pid, channel: channel, payload: payload }), - Ok(ParameterStatus { parameter, value }) => + ParameterStatus { parameter, value } => info!("Parameter {} = {}", parameter, value), - val => return val + val => return Ok(val) } } } @@ -697,19 +722,33 @@ impl PostgresConnection { } } - /// Begins a new transaction. + /// Attempts to begin a new transaction. /// /// Returns a `PostgresTransaction` object which should be used instead of /// the connection for the duration of the transaction. The transaction /// is active until the `PostgresTransaction` object falls out of scope. /// A transaction will commit by default unless the task fails or the /// transaction is set to roll back. - pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> { - self.quick_query("BEGIN"); - PostgresTransaction { + pub fn try_transaction<'a>(&'a self) + -> Result, PostgresError> { + check_desync!(self); + if_ok!(self.quick_query("BEGIN")); + Ok(PostgresTransaction { conn: self, - commit: RefCell::new(true), + commit: Cell::new(true), nested: false + }) + } + + /// A convenience wrapper around `try_transaction`. + /// + /// # Failure + /// + /// Fails if there was an error beginning the transaction. + pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> { + match self.try_transaction() { + Ok(trans) => trans, + Err(err) => fail!("Error preparing transaction: {}", err) } } @@ -745,6 +784,15 @@ impl PostgresConnection { self.conn.with(|conn| conn.cancel_data) } + /// Returns whether or not the stream has been desynchronized due to an + /// error in the communication channel with the server. + /// + /// If this has occurred, all further queries will immediately return an + /// error. + pub fn is_desynchronized(&self) -> bool { + self.conn.with(|conn| conn.desynchronized) + } + fn quick_query(&self, query: &str) -> Result<~[~[Option<~str>]], PostgresError> { self.conn.with_mut(|conn| conn.quick_query(query)) } @@ -775,14 +823,14 @@ pub enum SslMode { /// Represents a transaction on a database connection pub struct PostgresTransaction<'conn> { priv conn: &'conn PostgresConnection, - priv commit: RefCell, + priv commit: Cell, priv nested: bool } #[unsafe_destructor] impl<'conn> Drop for PostgresTransaction<'conn> { fn drop(&mut self) { - if task::failing() || !self.commit.with(|x| *x) { + if task::failing() || !self.commit.get() { if self.nested { self.conn.quick_query("ROLLBACK TO sp"); } else { @@ -828,13 +876,23 @@ impl<'conn> PostgresTransaction<'conn> { self.conn.execute(query, params) } - /// Like `PostgresConnection::transaction`. - pub fn transaction<'a>(&self) -> PostgresTransaction<'conn> { - self.conn.quick_query("SAVEPOINT sp"); - PostgresTransaction { + /// Like `PostgresConnection::try_transaction`. + pub fn try_transaction<'a>(&'a self) + -> Result, PostgresError> { + check_desync!(self.conn); + if_ok!(self.conn.quick_query("SAVEPOINT sp")); + Ok(PostgresTransaction { conn: self.conn, - commit: RefCell::new(true), + commit: Cell::new(true), nested: true + }) + } + + /// Like `PostgresTransaction::transaction`. + pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> { + match self.try_transaction() { + Ok(trans) => trans, + Err(err) => fail!("Error preparing transaction: {}", err) } } @@ -843,19 +901,24 @@ impl<'conn> PostgresTransaction<'conn> { self.conn.notifications() } + /// Like `PostgresConnection::is_desynchronized`. + pub fn is_desynchronized(&self) -> bool { + self.conn.is_desynchronized() + } + /// Determines if the transaction is currently set to commit or roll back. pub fn will_commit(&self) -> bool { - self.commit.with(|x| *x) + self.commit.get() } /// Sets the transaction to commit at its completion. pub fn set_commit(&self) { - self.commit.with_mut(|x| *x = true); + self.commit.set(true); } /// Sets the transaction to roll back at its completion. pub fn set_rollback(&self) { - self.commit.with_mut(|x| *x = false); + self.commit.set(false); } } @@ -1015,6 +1078,7 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> { fn try_execute(&self, params: &[&ToSql]) -> Result { + check_desync!(self.conn); if_ok!(self.execute("", 0, params)); let num; @@ -1047,6 +1111,7 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> { fn try_query<'a>(&'a self, params: &[&ToSql]) -> Result, PostgresError> { + check_desync!(self.conn); self.try_lazy_query(0, params) } } @@ -1112,6 +1177,7 @@ impl<'conn> TransactionalPostgresStatement<'conn> { /// the parameters of the statement. pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql]) -> Result, PostgresError> { + check_desync!(self.stmt.conn); self.stmt.try_lazy_query(row_limit, params) } @@ -1124,7 +1190,7 @@ impl<'conn> TransactionalPostgresStatement<'conn> { -> PostgresResult<'a> { match self.try_lazy_query(row_limit, params) { Ok(result) => result, - Err(err) => fail!("Error executing query:\n{}", err.to_str()) + Err(err) => fail!("Error executing query:\n{}", err) } } } diff --git a/src/pool.rs b/src/pool.rs index d4f98d84..558740b1 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -139,6 +139,12 @@ impl PooledPostgresConnection { self.conn.get_ref().execute(query, params) } + /// Like `PostgresConnection::try_transaction`. + pub fn try_transaction<'a>(&'a self) + -> Result, PostgresError> { + self.conn.get_ref().try_transaction() + } + /// Like `PostgresConnection::transaction`. pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> { self.conn.get_ref().transaction() @@ -153,4 +159,9 @@ impl PooledPostgresConnection { pub fn cancel_data(&self) -> PostgresCancelData { self.conn.get_ref().cancel_data() } + + /// Like `PostgresConnection::is_desynchronized`. + pub fn is_desynchronized(&self) -> bool { + self.conn.get_ref().is_desynchronized() + } }