From 6e2927bf7e4fe5f1026fc877afab23784db9143a Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Thu, 23 Feb 2017 20:27:29 -0800 Subject: [PATCH] Rustfmt --- tokio-postgres/src/lib.rs | 315 ++++++++++++++---------------- tokio-postgres/src/stream.rs | 20 +- tokio-postgres/src/test.rs | 48 +++-- tokio-postgres/src/tls/openssl.rs | 3 +- tokio-postgres/src/transaction.rs | 17 +- tokio-postgres/src/types.rs | 4 +- 6 files changed, 208 insertions(+), 199 deletions(-) diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index a8396a19..612cdf6e 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -21,7 +21,9 @@ //! //! fn main() { //! let mut l = Core::new().unwrap(); -//! let done = Connection::connect("postgresql://postgres@localhost", TlsMode::None, &l.handle()) +//! let done = Connection::connect("postgresql://postgres@localhost", +//! TlsMode::None, +//! &l.handle()) //! .then(|c| { //! c.unwrap() //! .batch_execute("CREATE TABLE person ( @@ -144,12 +146,9 @@ pub fn cancel_query(params: T, { let params = match params.into_connect_params() { Ok(params) => { - Either::A(stream::connect(params.host().clone(), - params.port(), - tls_mode, - handle)) + Either::A(stream::connect(params.host().clone(), params.port(), tls_mode, handle)) } - Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()) + Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()), }; params.and_then(move |c| { @@ -266,31 +265,29 @@ impl Connection { { let fut = match params.into_connect_params() { Ok(params) => { - Either::A(stream::connect(params.host().clone(), - params.port(), - tls_mode, - handle) + Either::A(stream::connect(params.host().clone(), params.port(), tls_mode, handle) .map(|s| (s, params))) } - Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()) + Err(e) => Either::B(Err(ConnectError::ConnectParams(e)).into_future()), }; fut.map(|(s, params)| { let (sender, receiver) = mpsc::channel(); (Connection(InnerConnection { - stream: s, - close_sender: sender, - close_receiver: receiver, - parameters: HashMap::new(), - types: HashMap::new(), - cancel_data: CancelData { - process_id: 0, - secret_key: 0, - }, - has_typeinfo_query: false, - has_typeinfo_enum_query: false, - has_typeinfo_composite_query: false, - }), params) + stream: s, + close_sender: sender, + close_receiver: receiver, + parameters: HashMap::new(), + types: HashMap::new(), + cancel_data: CancelData { + process_id: 0, + secret_key: 0, + }, + has_typeinfo_query: false, + has_typeinfo_enum_query: false, + has_typeinfo_composite_query: false, + }), + params) }) .and_then(|(s, params)| s.startup(params)) .and_then(|(s, params)| s.handle_auth(params)) @@ -298,7 +295,9 @@ impl Connection { .boxed() } - fn startup(self, params: ConnectParams) -> BoxFuture<(Connection, ConnectParams), ConnectError> { + fn startup(self, + params: ConnectParams) + -> BoxFuture<(Connection, ConnectParams), ConnectError> { let mut buf = vec![]; let result = { let options = [("client_encoding", "UTF8"), ("timezone", "GMT")]; @@ -319,7 +318,8 @@ impl Connection { } fn handle_auth(self, params: ConnectParams) -> BoxFuture { - self.0.read() + self.0 + .read() .map_err(ConnectError::Io) .and_then(move |(m, s)| { let response = match m { @@ -333,8 +333,9 @@ impl Connection { .map_err(Into::into) } None => { - Err(ConnectError::ConnectParams( - "password was required but not provided".into())) + Err(ConnectError::ConnectParams("password was required but not \ + provided" + .into())) } } } @@ -350,8 +351,9 @@ impl Connection { .map_err(Into::into) } None => { - Err(ConnectError::ConnectParams( - "password was required but not provided".into())) + Err(ConnectError::ConnectParams("password was required but not \ + provided" + .into())) } } } @@ -361,45 +363,41 @@ impl Connection { response.map(|m| (m, Connection(s))) }) - .and_then(|(m, s)| { - match m { - Some(m) => Either::A(s.handle_auth_response(m)), - None => Either::B(Ok(s).into_future()) - } + .and_then(|(m, s)| match m { + Some(m) => Either::A(s.handle_auth_response(m)), + None => Either::B(Ok(s).into_future()), }) .boxed() } fn handle_auth_response(self, message: Vec) -> BoxFuture { - self.0.send(message) + self.0 + .send(message) .and_then(|s| s.read()) .map_err(ConnectError::Io) - .and_then(|(m, s)| { - match m { - backend::Message::AuthenticationOk => Ok(Connection(s)), - backend::Message::ErrorResponse(body) => Err(connect_err(&mut body.fields())), - _ => Err(bad_message()), - } + .and_then(|(m, s)| match m { + backend::Message::AuthenticationOk => Ok(Connection(s)), + backend::Message::ErrorResponse(body) => Err(connect_err(&mut body.fields())), + _ => Err(bad_message()), }) .boxed() } fn finish_startup(self) -> BoxFuture { - self.0.read() + self.0 + .read() .map_err(ConnectError::Io) - .and_then(|(m, mut s)| { - match m { - backend::Message::BackendKeyData(body) => { - s.cancel_data.process_id = body.process_id(); - s.cancel_data.secret_key = body.secret_key(); - Either::A(Connection(s).finish_startup()) - } - backend::Message::ReadyForQuery(_) => Either::B(Ok(Connection(s)).into_future()), - backend::Message::ErrorResponse(body) => { - Either::B(Err(connect_err(&mut body.fields())).into_future()) - } - _ => Either::B(Err(bad_message()).into_future()), + .and_then(|(m, mut s)| match m { + backend::Message::BackendKeyData(body) => { + s.cancel_data.process_id = body.process_id(); + s.cancel_data.secret_key = body.secret_key(); + Either::A(Connection(s).finish_startup()) } + backend::Message::ReadyForQuery(_) => Either::B(Ok(Connection(s)).into_future()), + backend::Message::ErrorResponse(body) => { + Either::B(Err(connect_err(&mut body.fields())).into_future()) + } + _ => Either::B(Err(bad_message()).into_future()), }) .boxed() } @@ -415,29 +413,30 @@ impl Connection { } // This has its own read_rows since it will need to handle multiple query completions - fn simple_read_rows(self, mut rows: Vec) -> BoxFuture<(Vec, Connection), Error> { - self.0.read() + fn simple_read_rows(self, + mut rows: Vec) + -> BoxFuture<(Vec, Connection), Error> { + self.0 + .read() .map_err(Error::Io) - .and_then(|(m, s)| { - match m { - backend::Message::ReadyForQuery(_) => { - Ok((rows, Connection(s))).into_future().boxed() - } - backend::Message::DataRow(body) => { - match body.values().collect() { - Ok(row) => { - rows.push(row); - Connection(s).simple_read_rows(rows) - } - Err(e) => Err(Error::Io(e)).into_future().boxed(), - } - } - backend::Message::EmptyQueryResponse | - backend::Message::CommandComplete(_) | - backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows), - backend::Message::ErrorResponse(body) => Connection(s).ready_err(body), - _ => Err(bad_message()).into_future().boxed(), + .and_then(|(m, s)| match m { + backend::Message::ReadyForQuery(_) => { + Ok((rows, Connection(s))).into_future().boxed() } + backend::Message::DataRow(body) => { + match body.values().collect() { + Ok(row) => { + rows.push(row); + Connection(s).simple_read_rows(rows) + } + Err(e) => Err(Error::Io(e)).into_future().boxed(), + } + } + backend::Message::EmptyQueryResponse | + backend::Message::CommandComplete(_) | + backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows), + backend::Message::ErrorResponse(body) => Connection(s).ready_err(body), + _ => Err(bad_message()).into_future().boxed(), }) .boxed() } @@ -445,13 +444,12 @@ impl Connection { fn ready(self, t: T) -> BoxFuture<(T, Connection), Error> where T: 'static + Send { - self.0.read() + self.0 + .read() .map_err(Error::Io) - .and_then(|(m, s)| { - match m { - backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))), - _ => Err(bad_message()) - } + .and_then(|(m, s)| match m { + backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))), + _ => Err(bad_message()), }) .and_then(|(t, s)| s.close_gc().map(|s| (t, s))) .boxed() @@ -471,26 +469,22 @@ impl Connection { let mut buf = vec![]; frontend::sync(&mut buf); messages.push(buf); - self.0.send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>))) + self.0 + .send_all(futures::stream::iter(messages.into_iter().map(Ok::<_, io::Error>))) .map_err(Error::Io) .and_then(|s| Connection(s.0).finish_close_gc()) .boxed() } fn finish_close_gc(self) -> BoxFuture { - self.0.read() + self.0 + .read() .map_err(Error::Io) - .and_then(|(m, s)| { - match m { - backend::Message::ReadyForQuery(_) => { - Either::A(Ok(Connection(s)).into_future()) - } - backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()), - backend::Message::ErrorResponse(body) => { - Either::B(Connection(s).ready_err(body)) - } - _ => Either::A(Err(bad_message()).into_future()), - } + .and_then(|(m, s)| match m { + backend::Message::ReadyForQuery(_) => Either::A(Ok(Connection(s)).into_future()), + backend::Message::CloseComplete => Either::B(Connection(s).finish_close_gc()), + backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)), + _ => Either::A(Err(bad_message()).into_future()), }) .boxed() } @@ -685,7 +679,9 @@ impl Connection { } else if elem_oid != 0 { Either::B(c.get_type(elem_oid).map(|(t, c)| (Kind::Array(t), c)).boxed()) } else if relid != 0 { - Either::B(c.get_composite_fields(relid).map(|(f, c)| (Kind::Composite(f), c)).boxed()) + Either::B(c.get_composite_fields(relid) + .map(|(f, c)| (Kind::Composite(f), c)) + .boxed()) } else if let Some(rngsubtype) = rngsubtype { Either::B(c.get_type(rngsubtype).map(|(t, c)| (Kind::Range(t), c)).boxed()) } else { @@ -766,21 +762,17 @@ impl Connection { FROM pg_catalog.pg_enum \ WHERE enumtypid = $1 \ ORDER BY enumsortorder") - .or_else(|e| { - match e { - Error::Db(e, c) => { - if e.code != SqlState::UndefinedColumn { - return Either::B(Err(Error::Db(e, c)).into_future()); - } - - Either::A(c.raw_prepare(TYPEINFO_ENUM_QUERY, - "SELECT enumlabel \ - FROM pg_catalog.pg_enum \ - WHERE enumtypid = $1 \ - ORDER BY oid")) + .or_else(|e| match e { + Error::Db(e, c) => { + if e.code != SqlState::UndefinedColumn { + return Either::B(Err(Error::Db(e, c)).into_future()); } - e => Either::B(Err(e).into_future()), + + Either::A(c.raw_prepare(TYPEINFO_ENUM_QUERY, + "SELECT enumlabel FROM pg_catalog.pg_enum WHERE \ + enumtypid = $1 ORDER BY oid")) } + e => Either::B(Err(e).into_future()), }) .map(|(_, _, mut c)| { c.0.has_typeinfo_enum_query = true; @@ -791,7 +783,9 @@ impl Connection { fn get_composite_fields(self, oid: Oid) -> BoxFuture<(Vec, Connection), Error> { self.setup_typeinfo_composite_query() - .and_then(move |c| c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::Oid], &[&oid])) + .and_then(move |c| { + c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::Oid], &[&oid]) + }) .and_then(|c| c.read_rows().collect()) .and_then(|(r, c)| { futures::stream::iter(r.into_iter().map(Ok)) @@ -852,12 +846,10 @@ impl Connection { stmt, Some(1), params.iter().zip(param_types), - |(param, ty), buf| { - match param.to_sql_checked(ty, buf) { - Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes), - Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No), - Err(e) => Err(e), - } + |(param, ty), buf| match param.to_sql_checked(ty, buf) { + Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes), + Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No), + Err(e) => Err(e), }, Some(1), &mut bind); @@ -874,52 +866,46 @@ impl Connection { }) .into_future() .and_then(|s| { - let it = Some(bind).into_iter() + let it = Some(bind) + .into_iter() .chain(Some(execute)) .chain(Some(sync)) .map(Ok::<_, io::Error>); s.0.send_all(futures::stream::iter(it)).map_err(Error::Io) }) .and_then(|s| s.0.read().map_err(Error::Io)) - .and_then(|(m, s)| { - match m { - backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()), - backend::Message::ErrorResponse(body) => { - Either::B(Connection(s).ready_err(body)) - } - _ => Either::A(Err(bad_message()).into_future()), - } + .and_then(|(m, s)| match m { + backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()), + backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)), + _ => Either::A(Err(bad_message()).into_future()), }) .boxed() } fn finish_execute(self) -> BoxFuture<(u64, Connection), Error> { - self.0.read() + self.0 + .read() .map_err(Error::Io) - .and_then(|(m, s)| { - match m { - backend::Message::DataRow(_) => Either::B(Connection(s).finish_execute()), - backend::Message::CommandComplete(body) => { - Either::A(body.tag() - .map(|tag| { - let num = tag.split_whitespace() - .last() - .unwrap() - .parse() - .unwrap_or(0); - (num, Connection(s)) - }) - .map_err(Error::Io) - .into_future()) - } - backend::Message::EmptyQueryResponse => { - Either::A(Ok((0, Connection(s))).into_future()) - } - backend::Message::ErrorResponse(body) => { - Either::B(Connection(s).ready_err(body)) - } - _ => Either::A(Err(bad_message()).into_future()), + .and_then(|(m, s)| match m { + backend::Message::DataRow(_) => Either::B(Connection(s).finish_execute()), + backend::Message::CommandComplete(body) => { + Either::A(body.tag() + .map(|tag| { + let num = tag.split_whitespace() + .last() + .unwrap() + .parse() + .unwrap_or(0); + (num, Connection(s)) + }) + .map_err(Error::Io) + .into_future()) } + backend::Message::EmptyQueryResponse => { + Either::A(Ok((0, Connection(s))).into_future()) + } + backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)), + _ => Either::A(Err(bad_message()).into_future()), }) .and_then(|(n, s)| s.ready(n)) .boxed() @@ -927,21 +913,21 @@ impl Connection { fn read_rows(self) -> BoxStateStream { futures_state_stream::unfold(self, |c| { - c.read_row() - .and_then(|(r, c)| { - match r { + c.read_row() + .and_then(|(r, c)| match r { Some(data) => { let event = StreamEvent::Next((data, c)); Either::A(Ok(event).into_future()) - }, + } None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))), - } - }) - }).boxed() + }) + }) + .boxed() } fn read_row(self) -> BoxFuture<(Option, Connection), Error> { - self.0.read() + self.0 + .read() .map_err(Error::Io) .and_then(|(m, s)| { let c = Connection(s); @@ -955,9 +941,7 @@ impl Connection { } backend::Message::EmptyQueryResponse | backend::Message::CommandComplete(_) => Either::A(Ok((None, c)).into_future()), - backend::Message::ErrorResponse(body) => { - Either::B(c.ready_err(body)) - } + backend::Message::ErrorResponse(body) => Either::B(c.ready_err(body)), _ => Either::A(Err(bad_message()).into_future()), } }) @@ -970,10 +954,8 @@ impl Connection { let name = format!("s{}", id); self.raw_prepare(&name, query) .map(|(params, columns, conn)| { - let stmt = Statement::new(conn.0.close_sender.clone(), - name, - params, - Arc::new(columns)); + let stmt = + Statement::new(conn.0.close_sender.clone(), name, params, Arc::new(columns)); (stmt, conn) }) .boxed() @@ -985,7 +967,10 @@ impl Connection { /// /// Panics if the number of parameters provided does not match the number /// expected. - pub fn execute(self, statement: &Statement, params: &[&ToSql]) -> BoxFuture<(u64, Connection), Error> { + pub fn execute(self, + statement: &Statement, + params: &[&ToSql]) + -> BoxFuture<(u64, Connection), Error> { self.raw_execute(statement.name(), "", statement.parameters(), params) .and_then(|conn| conn.finish_execute()) .boxed() diff --git a/tokio-postgres/src/stream.rs b/tokio-postgres/src/stream.rs index 6adbdb09..d7975c65 100644 --- a/tokio-postgres/src/stream.rs +++ b/tokio-postgres/src/stream.rs @@ -39,7 +39,10 @@ pub fn connect(host: Host, } #[cfg(not(unix))] Host::Unix(_) => { - Either::B(Err(ConnectError::ConnectParams("unix sockets are not supported on this platform".into())).into_future()) + Either::B(Err(ConnectError::ConnectParams("unix sockets are not supported on this \ + platform" + .into())) + .into_future()) } }; @@ -52,7 +55,7 @@ pub fn connect(host: Host, s.framed(PostgresCodec) }) .boxed() - }, + } }; inner.map(|s| s.framed(SslCodec)) @@ -67,13 +70,18 @@ pub fn connect(host: Host, let s = s.into_inner(); match (m, required) { (Some(b'N'), true) => { - Either::A(Err(ConnectError::Tls("the server does not support TLS".into())).into_future()) + Either::A(Err(ConnectError::Tls("the server does not support TLS".into())) + .into_future()) } (Some(b'N'), false) => { let s: Box = Box::new(s); Either::A(Ok(s).into_future()) - }, - (None, _) => Either::A(Err(ConnectError::Io(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF"))).into_future()), + } + (None, _) => { + Either::A(Err(ConnectError::Io(io::Error::new(io::ErrorKind::UnexpectedEof, + "unexpected EOF"))) + .into_future()) + } _ => { let host = match host { Host::Tcp(ref host) => host, @@ -155,7 +163,7 @@ impl Codec for PostgresCodec { buf.drain_to(consumed); Ok(Some(message)) } - ParseResult::Incomplete { .. } => Ok(None) + ParseResult::Incomplete { .. } => Ok(None), } } diff --git a/tokio-postgres/src/test.rs b/tokio-postgres/src/test.rs index 7b97163a..5614c4b9 100644 --- a/tokio-postgres/src/test.rs +++ b/tokio-postgres/src/test.rs @@ -98,18 +98,19 @@ fn batch_execute_ok() { fn batch_execute_err() { let mut l = Core::new().unwrap(); let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle()) - .then(|r| r.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); \ - INSERT INTO foo DEFAULT VALUES;")) - .and_then(|c| c.batch_execute("SELECT * FROM bogo")) .then(|r| { - match r { - Err(Error::Db(e, s)) => { - assert!(e.code == SqlState::UndefinedTable); - s.batch_execute("SELECT * FROM foo") - } - Err(e) => panic!("unexpected error: {}", e), - Ok(_) => panic!("unexpected success"), - } + r.unwrap() + .batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL); INSERT INTO foo DEFAULT \ + VALUES;") + }) + .and_then(|c| c.batch_execute("SELECT * FROM bogo")) + .then(|r| match r { + Err(Error::Db(e, s)) => { + assert!(e.code == SqlState::UndefinedTable); + s.batch_execute("SELECT * FROM foo") + } + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("unexpected success"), }); l.run(done).unwrap(); } @@ -157,7 +158,9 @@ fn query() { fn transaction() { let mut l = Core::new().unwrap(); let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle()) - .then(|c| c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);")) + .then(|c| { + c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);") + }) .then(|c| c.unwrap().transaction()) .then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('joe');")) .then(|t| t.unwrap().rollback()) @@ -196,7 +199,9 @@ fn ssl_user_ssl_required() { let mut l = Core::new().unwrap(); let handle = l.handle(); - let done = Connection::connect("postgres://ssl_user@localhost/postgres", TlsMode::None, &handle); + let done = Connection::connect("postgres://ssl_user@localhost/postgres", + TlsMode::None, + &handle); match l.run(done) { Err(ConnectError::Db(e)) => assert!(e.code == SqlState::InvalidAuthorizationSpecification), @@ -231,7 +236,10 @@ fn domain() { struct SessionId(Vec); impl ToSql for SessionId { - fn to_sql(&self, ty: &Type, out: &mut Vec) -> Result> { + fn to_sql(&self, + ty: &Type, + out: &mut Vec) + -> Result> { let inner = match *ty.kind() { Kind::Domain(ref inner) => inner, _ => unreachable!(), @@ -242,7 +250,7 @@ fn domain() { fn accepts(ty: &Type) -> bool { match *ty.kind() { Kind::Domain(Type::Bytea) => ty.name() == "session_id", - _ => false + _ => false, } } @@ -264,9 +272,10 @@ fn domain() { let handle = l.handle(); let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &handle) .then(|c| { - c.unwrap().batch_execute( - "CREATE DOMAIN pg_temp.session_id AS bytea CHECK(octet_length(VALUE) = 16); - CREATE TABLE pg_temp.foo (id pg_temp.session_id);") + c.unwrap().batch_execute("CREATE DOMAIN pg_temp.session_id AS bytea \ + CHECK(octet_length(VALUE) = 16); + CREATE \ + TABLE pg_temp.foo (id pg_temp.session_id);") }) .and_then(|c| c.prepare("INSERT INTO pg_temp.foo (id) VALUES ($1)")) .and_then(|(s, c)| { @@ -330,7 +339,8 @@ fn enum_() { assert_eq!(type_.name(), "mood"); match *type_.kind() { Kind::Enum(ref variants) => { - assert_eq!(variants, &["sad".to_owned(), "ok".to_owned(), "happy".to_owned()]); + assert_eq!(variants, + &["sad".to_owned(), "ok".to_owned(), "happy".to_owned()]); } _ => panic!("bad type"), } diff --git a/tokio-postgres/src/tls/openssl.rs b/tokio-postgres/src/tls/openssl.rs index 6349b9d1..ae60af87 100644 --- a/tokio-postgres/src/tls/openssl.rs +++ b/tokio-postgres/src/tls/openssl.rs @@ -40,7 +40,8 @@ impl Handshake for OpenSsl { host: &str, stream: Stream) -> BoxFuture, Box> { - self.0.connect_async(host, stream) + self.0 + .connect_async(host, stream) .map(|s| { let s: Box = Box::new(s); s diff --git a/tokio-postgres/src/transaction.rs b/tokio-postgres/src/transaction.rs index 31957481..9eea23bc 100644 --- a/tokio-postgres/src/transaction.rs +++ b/tokio-postgres/src/transaction.rs @@ -22,7 +22,8 @@ impl TransactionNew for Transaction { impl Transaction { /// Like `Connection::batch_execute`. pub fn batch_execute(self, query: &str) -> BoxFuture> { - self.0.batch_execute(query) + self.0 + .batch_execute(query) .map(Transaction) .map_err(transaction_err) .boxed() @@ -30,7 +31,8 @@ impl Transaction { /// Like `Connection::prepare`. pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), Error> { - self.0.prepare(query) + self.0 + .prepare(query) .map(|(s, c)| (s, Transaction(c))) .map_err(transaction_err) .boxed() @@ -41,7 +43,8 @@ impl Transaction { statement: &Statement, params: &[&ToSql]) -> BoxFuture<(u64, Transaction), Error> { - self.0.execute(statement, params) + self.0 + .execute(statement, params) .map(|(n, c)| (n, Transaction(c))) .map_err(transaction_err) .boxed() @@ -52,7 +55,8 @@ impl Transaction { statement: &Statement, params: &[&ToSql]) -> BoxStateStream> { - self.0.query(statement, params) + self.0 + .query(statement, params) .map_state(Transaction) .map_err(transaction_err) .boxed() @@ -69,7 +73,8 @@ impl Transaction { } fn finish(self, query: &str) -> BoxFuture { - self.0.simple_query(query) + self.0 + .simple_query(query) .map(|(_, c)| c) .boxed() } @@ -79,6 +84,6 @@ fn transaction_err(e: Error) -> Error { match e { Error::Io(e) => Error::Io(e), Error::Db(e, c) => Error::Db(e, Transaction(c)), - Error::Conversion(e, c) => Error::Conversion(e, Transaction(c)) + Error::Conversion(e, c) => Error::Conversion(e, Transaction(c)), } } diff --git a/tokio-postgres/src/types.rs b/tokio-postgres/src/types.rs index 05f9268f..12d0f4b9 100644 --- a/tokio-postgres/src/types.rs +++ b/tokio-postgres/src/types.rs @@ -1,8 +1,8 @@ //! Postgres types #[doc(inline)] -pub use postgres_shared::types::{Oid, Type, Date, Timestamp, Kind, Field, Other, - WasNull, WrongType, FromSql, IsNull, ToSql}; +pub use postgres_shared::types::{Oid, Type, Date, Timestamp, Kind, Field, Other, WasNull, WrongType, + FromSql, IsNull, ToSql}; #[doc(hidden)] pub use postgres_shared::types::__to_sql_checked;