From ac2b849f3fd38e3feb0d288bc08ddb368979f2d1 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Mon, 26 Dec 2016 15:43:08 -0500 Subject: [PATCH] Split out transaction to a module --- postgres-tokio/src/lib.rs | 67 +++------------------------- postgres-tokio/src/transaction.rs | 73 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 61 deletions(-) create mode 100644 postgres-tokio/src/transaction.rs diff --git a/postgres-tokio/src/lib.rs b/postgres-tokio/src/lib.rs index eaae6ade..c8fa1de7 100644 --- a/postgres-tokio/src/lib.rs +++ b/postgres-tokio/src/lib.rs @@ -35,12 +35,14 @@ pub use postgres_shared::{params, Column, RowIndex}; use error::{ConnectError, Error, DbError, SqlState}; use params::{ConnectParams, IntoConnectParams}; use stream::PostgresStream; -use types::{Oid, Type, ToSql, SessionInfo, IsNull, FromSql, WrongType, Other, Kind, Field}; use tls::Handshake; +use transaction::Transaction; +use types::{Oid, Type, ToSql, SessionInfo, IsNull, FromSql, WrongType, Other, Kind, Field}; pub mod error; mod stream; pub mod tls; +pub mod transaction; #[macro_use] pub mod types; @@ -905,7 +907,7 @@ impl Connection { pub fn transaction(self) -> BoxFuture { self.simple_query("BEGIN") - .map(|(_, c)| Transaction(c)) + .map(|(_, c)| Transaction::new(c)) .boxed() } @@ -991,59 +993,6 @@ impl Row { } } -#[derive(Debug)] -pub struct Transaction(Connection); - -impl Transaction { - pub fn batch_execute(self, query: &str) -> BoxFuture> { - self.0.batch_execute(query) - .map(Transaction) - .map_err(transaction_err) - .boxed() - } - - pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), Error> { - self.0.prepare(query) - .map(|(s, c)| (s, Transaction(c))) - .map_err(transaction_err) - .boxed() - } - - pub fn execute(self, - statement: &Statement, - params: &[&ToSql]) - -> BoxFuture<(u64, Transaction), Error> { - self.0.execute(statement, params) - .map(|(n, c)| (n, Transaction(c))) - .map_err(transaction_err) - .boxed() - } - - pub fn query(self, - statement: &Statement, - params: &[&ToSql]) - -> BoxStateStream> { - self.0.query(statement, params) - .map_state(Transaction) - .map_err(transaction_err) - .boxed() - } - - pub fn commit(self) -> BoxFuture { - self.finish("COMMIT") - } - - pub fn rollback(self) -> BoxFuture { - self.finish("ROLLBACK") - } - - fn finish(self, query: &str) -> BoxFuture { - self.0.simple_query(query) - .map(|(_, c)| c) - .boxed() - } -} - fn connect_err(fields: &mut ErrorFields) -> ConnectError { match DbError::new(fields) { Ok(err) => ConnectError::Db(Box::new(err)), @@ -1057,10 +1006,6 @@ fn bad_message() -> T io::Error::new(io::ErrorKind::InvalidInput, "unexpected message").into() } -fn transaction_err(e: Error) -> Error { - match e { - Error::Io(e) => Error::Io(e), - Error::Db(e, c) => Error::Db(e, Transaction(c)), - Error::Conversion(e, c) => Error::Conversion(e, Transaction(c)) - } +trait TransactionNew { + fn new(c: Connection) -> Transaction; } diff --git a/postgres-tokio/src/transaction.rs b/postgres-tokio/src/transaction.rs new file mode 100644 index 00000000..de4dff1f --- /dev/null +++ b/postgres-tokio/src/transaction.rs @@ -0,0 +1,73 @@ +use futures::{Future, BoxFuture}; +use futures_state_stream::{StateStream, BoxStateStream}; + +use {Connection, Statement, Row, TransactionNew}; +use error::Error; +use types::ToSql; + +#[derive(Debug)] +pub struct Transaction(Connection); + +impl TransactionNew for Transaction { + fn new(c: Connection) -> Transaction { + Transaction(c) + } +} + +impl Transaction { + pub fn batch_execute(self, query: &str) -> BoxFuture> { + self.0.batch_execute(query) + .map(Transaction) + .map_err(transaction_err) + .boxed() + } + + pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), Error> { + self.0.prepare(query) + .map(|(s, c)| (s, Transaction(c))) + .map_err(transaction_err) + .boxed() + } + + pub fn execute(self, + statement: &Statement, + params: &[&ToSql]) + -> BoxFuture<(u64, Transaction), Error> { + self.0.execute(statement, params) + .map(|(n, c)| (n, Transaction(c))) + .map_err(transaction_err) + .boxed() + } + + pub fn query(self, + statement: &Statement, + params: &[&ToSql]) + -> BoxStateStream> { + self.0.query(statement, params) + .map_state(Transaction) + .map_err(transaction_err) + .boxed() + } + + pub fn commit(self) -> BoxFuture { + self.finish("COMMIT") + } + + pub fn rollback(self) -> BoxFuture { + self.finish("ROLLBACK") + } + + fn finish(self, query: &str) -> BoxFuture { + self.0.simple_query(query) + .map(|(_, c)| c) + .boxed() + } +} + +fn transaction_err(e: Error) -> Error { + match e { + Error::Io(e) => Error::Io(e), + Error::Db(e, c) => Error::Db(e, Transaction(c)), + Error::Conversion(e, c) => Error::Conversion(e, Transaction(c)) + } +}