From bb3ebbf943c252302daa361b598ba2bd12335494 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Tue, 9 Jan 2018 21:15:35 -0800 Subject: [PATCH] Fix some warnings --- postgres-shared/src/types/bit_vec.rs | 4 +- postgres-shared/src/types/chrono.rs | 12 +- postgres-shared/src/types/rustc_serialize.rs | 4 +- tokio-postgres/Cargo.toml | 2 +- tokio-postgres/src/lib.rs | 207 +++++++++---------- tokio-postgres/src/test.rs | 64 +++--- 6 files changed, 137 insertions(+), 156 deletions(-) diff --git a/postgres-shared/src/types/bit_vec.rs b/postgres-shared/src/types/bit_vec.rs index f3f11627..6bbbe836 100644 --- a/postgres-shared/src/types/bit_vec.rs +++ b/postgres-shared/src/types/bit_vec.rs @@ -4,7 +4,7 @@ use postgres_protocol::types; use self::bit_vec::BitVec; use std::error::Error; -use types::{FromSql, ToSql, IsNull, Type, BIT, VARBIT}; +use types::{FromSql, IsNull, ToSql, Type, BIT, VARBIT}; impl FromSql for BitVec { fn from_sql(_: &Type, raw: &[u8]) -> Result> { @@ -21,7 +21,7 @@ impl FromSql for BitVec { } impl ToSql for BitVec { - fn to_sql(&self, _: &Type, mut out: &mut Vec) -> Result> { + fn to_sql(&self, _: &Type, out: &mut Vec) -> Result> { types::varbit_to_sql(self.len(), self.to_bytes().into_iter(), out)?; Ok(IsNull::No) } diff --git a/postgres-shared/src/types/chrono.rs b/postgres-shared/src/types/chrono.rs index dc7368c8..fd55eb89 100644 --- a/postgres-shared/src/types/chrono.rs +++ b/postgres-shared/src/types/chrono.rs @@ -1,11 +1,11 @@ extern crate chrono; use postgres_protocol::types; -use self::chrono::{Duration, NaiveDate, NaiveTime, NaiveDateTime, DateTime, Utc, Local, - FixedOffset}; +use self::chrono::{DateTime, Duration, FixedOffset, Local, NaiveDate, NaiveDateTime, NaiveTime, + Utc}; use std::error::Error; -use types::{FromSql, ToSql, IsNull, Type, TIMESTAMP, TIMESTAMPTZ, DATE, TIME}; +use types::{FromSql, IsNull, ToSql, Type, DATE, TIME, TIMESTAMP, TIMESTAMPTZ}; fn base() -> NaiveDateTime { NaiveDate::from_ymd(2000, 1, 1).and_hms(0, 0, 0) @@ -62,11 +62,7 @@ impl FromSql for DateTime { } impl ToSql for DateTime { - fn to_sql( - &self, - type_: &Type, - mut w: &mut Vec, - ) -> Result> { + fn to_sql(&self, type_: &Type, w: &mut Vec) -> Result> { self.with_timezone(&Utc).to_sql(type_, w) } diff --git a/postgres-shared/src/types/rustc_serialize.rs b/postgres-shared/src/types/rustc_serialize.rs index 3790c7a9..f8dedc91 100644 --- a/postgres-shared/src/types/rustc_serialize.rs +++ b/postgres-shared/src/types/rustc_serialize.rs @@ -4,7 +4,7 @@ use self::rustc_serialize::json; use std::io::{Read, Write}; use std::error::Error; -use types::{FromSql, ToSql, IsNull, Type, JSON, JSONB}; +use types::{FromSql, IsNull, ToSql, Type, JSON, JSONB}; impl FromSql for json::Json { fn from_sql(ty: &Type, mut raw: &[u8]) -> Result> { @@ -23,7 +23,7 @@ impl FromSql for json::Json { } impl ToSql for json::Json { - fn to_sql(&self, ty: &Type, mut out: &mut Vec) -> Result> { + fn to_sql(&self, ty: &Type, out: &mut Vec) -> Result> { if *ty == JSONB { out.push(1); } diff --git a/tokio-postgres/Cargo.toml b/tokio-postgres/Cargo.toml index bb67b9b3..321f809a 100644 --- a/tokio-postgres/Cargo.toml +++ b/tokio-postgres/Cargo.toml @@ -49,7 +49,7 @@ tokio-dns-unofficial = "0.1" tokio-io = "0.1" tokio-openssl = { version = "0.1", optional = true } -openssl = { version = "0.9", optional = true } +openssl = { version = "0.9.23", optional = true } [target.'cfg(unix)'.dependencies] tokio-uds = "0.1" diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs index b8da20b3..de42e1b4 100644 --- a/tokio-postgres/src/lib.rs +++ b/tokio-postgres/src/lib.rs @@ -50,15 +50,14 @@ //! l.run(done).unwrap(); //! } //! ``` -#![doc(html_root_url="https://docs.rs/tokio-postgres/0.3")] +#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.3")] #![warn(missing_docs)] extern crate bytes; extern crate fallible_iterator; extern crate futures_state_stream; -#[cfg_attr(test, macro_use)] -extern crate postgres_shared; extern crate postgres_protocol; +extern crate postgres_shared; extern crate tokio_core; extern crate tokio_dns; extern crate tokio_io; @@ -70,19 +69,19 @@ extern crate futures; extern crate tokio_uds; use fallible_iterator::FallibleIterator; -use futures::{Future, IntoFuture, Stream, Sink, Poll, StartSend, Async}; +use futures::{Async, Future, IntoFuture, Poll, Sink, StartSend, Stream}; use futures::future::Either; -use futures_state_stream::{StreamEvent, StateStream, FutureExt}; +use futures_state_stream::{FutureExt, StateStream, StreamEvent}; use postgres_protocol::authentication; use postgres_protocol::message::{backend, frontend}; -use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields}; +use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody}; use postgres_shared::rows::RowData; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; -use std::sync::mpsc::{self, Sender, Receiver}; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::sync::mpsc::{self, Receiver, Sender}; use tokio_core::reactor::Handle; #[doc(inline)] @@ -90,14 +89,14 @@ pub use postgres_shared::{error, params, types, CancelData, Notification}; #[doc(inline)] pub use error::Error; -use error::{DbError, UNDEFINED_TABLE, UNDEFINED_COLUMN}; +use error::{DbError, UNDEFINED_COLUMN, UNDEFINED_TABLE}; use params::{ConnectParams, IntoConnectParams}; use sink::SinkExt; -use stmt::{Statement, Column}; +use stmt::{Column, Statement}; use stream::PostgresStream; use tls::Handshake; use transaction::Transaction; -use types::{Oid, Type, ToSql, IsNull, FromSql, Kind, Field, NAME, CHAR, OID}; +use types::{Field, FromSql, IsNull, Kind, Oid, ToSql, Type, CHAR, NAME, OID}; use rows::Row; #[macro_use] @@ -149,14 +148,12 @@ where T: IntoConnectParams, { let params = match params.into_connect_params() { - Ok(params) => { - Either::A(stream::connect( - params.host().clone(), - params.port(), - tls_mode, - handle, - )) - } + Ok(params) => Either::A(stream::connect( + params.host().clone(), + params.port(), + tls_mode, + handle, + )), Err(e) => Either::B(Err(error::connect(e)).into_future()), }; @@ -188,10 +185,7 @@ impl InnerConnection { fn read( self, ) -> Box< - Future< - Item = (backend::Message, InnerConnection), - Error = (io::Error, InnerConnection), - > + Future + Send, > { if self.desynchronized { @@ -307,12 +301,10 @@ impl Connection { T: IntoConnectParams, { let fut = match params.into_connect_params() { - Ok(params) => { - Either::A( - stream::connect(params.host().clone(), params.port(), tls_mode, handle) - .map(|s| (s, params)), - ) - } + Ok(params) => Either::A( + stream::connect(params.host().clone(), params.port(), tls_mode, handle) + .map(|s| (s, params)), + ), Err(e) => Either::B(Err(error::connect(e)).into_future()), }; @@ -384,35 +376,30 @@ impl Connection { .map(|()| Some(buf)) .map_err(Into::into) } - None => { - Err(error::connect( - "password was required but not provided".into(), - )) - } + None => Err(error::connect( + "password was required but not provided".into(), + )), } } - backend::Message::AuthenticationMd5Password(body) => { - match params.user().and_then( - |u| u.password().map(|p| (u.name(), p)), - ) { - Some((user, pass)) => { - let pass = authentication::md5_hash( - user.as_bytes(), - pass.as_bytes(), - body.salt(), - ); - let mut buf = vec![]; - frontend::password_message(&pass, &mut buf) - .map(|()| Some(buf)) - .map_err(Into::into) - } - None => { - Err(error::connect( - "password was required but not provided".into(), - )) - } + backend::Message::AuthenticationMd5Password(body) => match params + .user() + .and_then(|u| u.password().map(|p| (u.name(), p))) + { + Some((user, pass)) => { + let pass = authentication::md5_hash( + user.as_bytes(), + pass.as_bytes(), + body.salt(), + ); + let mut buf = vec![]; + frontend::password_message(&pass, &mut buf) + .map(|()| Some(buf)) + .map_err(Into::into) } - } + None => Err(error::connect( + "password was required but not provided".into(), + )), + }, backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())), _ => Err(bad_message()), }; @@ -489,18 +476,16 @@ impl Connection { backend::Message::ReadyForQuery(_) => { Ok((rows, Connection(s))).into_future().boxed2() } - backend::Message::DataRow(body) => { - match RowData::new(body) { - Ok(row) => { - rows.push(row); - Connection(s).simple_read_rows(rows) - } - Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(), + backend::Message::DataRow(body) => match RowData::new(body) { + Ok(row) => { + rows.push(row); + Connection(s).simple_read_rows(rows) } - } - backend::Message::EmptyQueryResponse | - backend::Message::CommandComplete(_) | - backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows), + Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(), + }, + backend::Message::EmptyQueryResponse + | backend::Message::CommandComplete(_) + | backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows), backend::Message::ErrorResponse(body) => Connection(s).ready_err(body), _ => Err((bad_message(), Connection(s))).into_future().boxed2(), }) @@ -599,18 +584,22 @@ impl Connection { self, name: &str, query: &str, - ) -> Box, Vec, Connection), Error = (Error, Connection)> + Send> { + ) -> Box, Vec, Connection), Error = (Error, Connection)> + Send> + { let mut parse = vec![]; let mut describe = vec![]; let mut sync = vec![]; frontend::sync(&mut sync); - if let Err(e) = frontend::parse(name, query, None, &mut parse).and_then(|()| frontend::describe(b'S', name, &mut describe)) { + if let Err(e) = frontend::parse(name, query, None, &mut parse) + .and_then(|()| frontend::describe(b'S', name, &mut describe)) + { return Err((error::io(e), self)).into_future().boxed2(); } - let it = Some(parse).into_iter().chain(Some(describe)).chain( - Some(sync), - ); + let it = Some(parse) + .into_iter() + .chain(Some(describe)) + .chain(Some(sync)); self.0.send_all2(futures::stream::iter_ok::<_, io::Error>(it)) .map_err(|(e, s, _)| (e, s)) .and_then(|s| s.0.read()) @@ -809,25 +798,25 @@ impl Connection { self.raw_prepare( TYPEINFO_QUERY, "SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \ - t.typbasetype, n.nspname, t.typrelid \ - FROM pg_catalog.pg_type t \ - LEFT OUTER JOIN pg_catalog.pg_range r ON \ - r.rngtypid = t.oid \ - INNER JOIN pg_catalog.pg_namespace n ON \ - t.typnamespace = n.oid \ - WHERE t.oid = $1", + t.typbasetype, n.nspname, t.typrelid \ + FROM pg_catalog.pg_type t \ + LEFT OUTER JOIN pg_catalog.pg_range r ON \ + r.rngtypid = t.oid \ + INNER JOIN pg_catalog.pg_namespace n ON \ + t.typnamespace = n.oid \ + WHERE t.oid = $1", ).or_else(|(e, c)| { // Range types weren't added until Postgres 9.2, so pg_range may not exist if e.code() == Some(&UNDEFINED_TABLE) { Either::A(c.raw_prepare( TYPEINFO_QUERY, "SELECT t.typname, t.typtype, t.typelem, \ - NULL::OID, t.typbasetype, n.nspname, \ - t.typrelid \ - FROM pg_catalog.pg_type t \ - INNER JOIN pg_catalog.pg_namespace n \ - ON t.typnamespace = n.oid \ - WHERE t.oid = $1", + NULL::OID, t.typbasetype, n.nspname, \ + t.typrelid \ + FROM pg_catalog.pg_type t \ + INNER JOIN pg_catalog.pg_namespace n \ + ON t.typnamespace = n.oid \ + WHERE t.oid = $1", )) } else { Either::B(Err((e, c)).into_future()) @@ -873,17 +862,19 @@ impl Connection { self.raw_prepare( TYPEINFO_ENUM_QUERY, "SELECT enumlabel \ - FROM pg_catalog.pg_enum \ - WHERE enumtypid = $1 \ - ORDER BY enumsortorder", - ).or_else(|(e, c)| if e.code() == Some(&UNDEFINED_COLUMN) { - Either::A(c.raw_prepare( - TYPEINFO_ENUM_QUERY, - "SELECT enumlabel FROM pg_catalog.pg_enum WHERE \ - enumtypid = $1 ORDER BY oid", - )) - } else { - Either::B(Err((e, c)).into_future()) + FROM pg_catalog.pg_enum \ + WHERE enumtypid = $1 \ + ORDER BY enumsortorder", + ).or_else(|(e, c)| { + if e.code() == Some(&UNDEFINED_COLUMN) { + Either::A(c.raw_prepare( + TYPEINFO_ENUM_QUERY, + "SELECT enumlabel FROM pg_catalog.pg_enum WHERE \ + enumtypid = $1 ORDER BY oid", + )) + } else { + Either::B(Err((e, c)).into_future()) + } }) .map(|(_, _, mut c)| { c.0.has_typeinfo_enum_query = true; @@ -930,11 +921,11 @@ impl Connection { self.raw_prepare( TYPEINFO_COMPOSITE_QUERY, "SELECT attname, atttypid \ - FROM pg_catalog.pg_attribute \ - WHERE attrelid = $1 \ - AND NOT attisdropped \ - AND attnum > 0 \ - ORDER BY attnum", + FROM pg_catalog.pg_attribute \ + WHERE attrelid = $1 \ + AND NOT attisdropped \ + AND attnum > 0 \ + ORDER BY attnum", ).map(|(_, _, mut c)| { c.0.has_typeinfo_composite_query = true; c @@ -984,9 +975,10 @@ impl Connection { Err(e) => Err((error::io(e), s)), }).into_future() .and_then(|s| { - let it = Some(bind).into_iter().chain(Some(execute)).chain( - Some(sync), - ); + let it = Some(bind) + .into_iter() + .chain(Some(execute)) + .chain(Some(sync)); s.0 .send_all2(futures::stream::iter_ok::<_, io::Error>(it)) .map_err(|(e, s, _)| (error::io(e), Connection(s))) @@ -1057,8 +1049,9 @@ impl Connection { }; Either::A(r.into_future()) } - backend::Message::EmptyQueryResponse | - backend::Message::CommandComplete(_) => Either::A(Ok((None, c)).into_future()), + backend::Message::EmptyQueryResponse | backend::Message::CommandComplete(_) => { + Either::A(Ok((None, c)).into_future()) + } backend::Message::ErrorResponse(body) => Either::B(c.ready_err(body)), _ => Either::A(Err((bad_message(), c)).into_future()), } @@ -1213,9 +1206,7 @@ trait BoxedStateStream { impl BoxedStateStream for T where - T: StateStream - + Send - + 'static, + T: StateStream + Send + 'static, { fn boxed2(self) -> Box + Send> { Box::new(self) diff --git a/tokio-postgres/src/test.rs b/tokio-postgres/src/test.rs index f916d97f..69c57c1d 100644 --- a/tokio-postgres/src/test.rs +++ b/tokio-postgres/src/test.rs @@ -6,9 +6,9 @@ use std::time::Duration; use tokio_core::reactor::{Core, Interval}; use super::*; -use error::{INVALID_PASSWORD, INVALID_AUTHORIZATION_SPECIFICATION, QUERY_CANCELED}; +use error::{INVALID_AUTHORIZATION_SPECIFICATION, INVALID_PASSWORD, QUERY_CANCELED}; use params::{ConnectParams, Host}; -use types::{ToSql, FromSql, Type, IsNull, Kind, BYTEA, TEXT, INT4, NUMERIC}; +use types::{FromSql, INT4, IsNull, Kind, ToSql, Type, BYTEA, NUMERIC, TEXT}; #[test] fn md5_user() { @@ -106,9 +106,8 @@ fn batch_execute_ok() { TlsMode::None, &l.handle(), ).then(|c| { - c.unwrap().batch_execute( - "CREATE TEMPORARY TABLE foo (id SERIAL);", - ) + c.unwrap() + .batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL);") }); l.run(done).unwrap(); } @@ -123,7 +122,7 @@ fn batch_execute_err() { ).then(|r| { r.unwrap().batch_execute( "CREATE TEMPORARY TABLE foo (id SERIAL); INSERT INTO foo DEFAULT \ - VALUES;", + VALUES;", ) }) .and_then(|c| c.batch_execute("SELECT * FROM bogo")) @@ -145,9 +144,8 @@ fn prepare_execute() { TlsMode::None, &l.handle(), ).then(|c| { - c.unwrap().prepare( - "CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)", - ) + c.unwrap() + .prepare("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)") }) .and_then(|(s, c)| c.execute(&s, &[])) .and_then(|(n, c)| { @@ -206,22 +204,19 @@ fn transaction() { TlsMode::None, &l.handle(), ).then(|c| { - c.unwrap().batch_execute( - "CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);", - ) + c.unwrap() + .batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);") }) .then(|c| c.unwrap().transaction()) .then(|t| { - t.unwrap().batch_execute( - "INSERT INTO foo (name) VALUES ('joe');", - ) + t.unwrap() + .batch_execute("INSERT INTO foo (name) VALUES ('joe');") }) .then(|t| t.unwrap().rollback()) .then(|c| c.unwrap().transaction()) .then(|t| { - t.unwrap().batch_execute( - "INSERT INTO foo (name) VALUES ('bob');", - ) + t.unwrap() + .batch_execute("INSERT INTO foo (name) VALUES ('bob');") }) .then(|t| t.unwrap().commit()) .then(|c| c.unwrap().prepare("SELECT name FROM foo")) @@ -243,11 +238,9 @@ fn unix_socket() { .and_then(|(s, c)| c.query(&s, &[]).collect()) .then(|r| { let r = r.unwrap().0; - let params = ConnectParams::builder().user("postgres", None).build( - Host::Unix( - PathBuf::from(r[0].get::(0)), - ), - ); + let params = ConnectParams::builder() + .user("postgres", None) + .build(Host::Unix(PathBuf::from(r[0].get::(0)))); Connection::connect(params, TlsMode::None, &handle) }) .then(|c| c.unwrap().batch_execute("")); @@ -274,14 +267,11 @@ fn ssl_user_ssl_required() { #[cfg(feature = "with-openssl")] #[test] fn openssl_required() { - use tls::openssl::openssl::ssl::{SslMethod, SslConnectorBuilder}; + use tls::openssl::openssl::ssl::{SslConnectorBuilder, SslMethod}; use tls::openssl::OpenSsl; let mut builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap(); - builder - .builder_mut() - .set_ca_file("../test/server.crt") - .unwrap(); + builder.set_ca_file("../test/server.crt").unwrap(); let negotiator = OpenSsl::from(builder.build()); let mut l = Core::new().unwrap(); @@ -401,9 +391,8 @@ fn enum_() { let done = Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle) .then(|c| { - c.unwrap().batch_execute( - "CREATE TYPE pg_temp.mood AS ENUM ('sad', 'ok', 'happy');", - ) + c.unwrap() + .batch_execute("CREATE TYPE pg_temp.mood AS ENUM ('sad', 'ok', 'happy');") }) .and_then(|c| c.prepare("SELECT $1::mood")) .map(|(s, _)| { @@ -466,14 +455,19 @@ fn notifications() { let done = Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle) .then(|c| c.unwrap().batch_execute("LISTEN test_notifications")) .and_then(|c1| { - Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle) - .then(|c2| { + Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle).then( + |c2| { c2.unwrap() .batch_execute("NOTIFY test_notifications, 'foo'") .map(|_| c1) - }) + }, + ) + }) + .and_then(|c| { + c.notifications() + .into_future() + .map_err(|(e, n)| (e, n.into_inner())) }) - .and_then(|c| c.notifications().into_future().map_err(|(e, n)| (e, n.into_inner()))) .map(|(n, _)| { let n = n.unwrap(); assert_eq!(n.channel, "test_notifications");