diff --git a/.circleci/config.yml b/.circleci/config.yml index cd0e716b..5d4bfbff 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -28,6 +28,8 @@ jobs: working_directory: ~/build docker: - image: rust:1.19.0 + environment: + RUSTFLAGS: -D warnings - image: sfackler/rust-postgres-test:3 steps: - checkout diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs index 37160419..4b17f6b0 100644 --- a/postgres/src/lib.rs +++ b/postgres/src/lib.rs @@ -66,7 +66,7 @@ //! .unwrap(); //! } //! ``` -#![doc(html_root_url="https://docs.rs/postgres/0.15.1")] +#![doc(html_root_url = "https://docs.rs/postgres/0.15.1")] #![warn(missing_docs)] #![allow(unknown_lints, needless_lifetimes, doc_markdown)] // for clippy @@ -81,7 +81,7 @@ extern crate socket2; use fallible_iterator::FallibleIterator; use std::cell::{Cell, RefCell}; -use std::collections::{VecDeque, HashMap}; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; use std::mem; @@ -96,13 +96,13 @@ use postgres_shared::rows::RowData; use error::{DbError, UNDEFINED_COLUMN, UNDEFINED_TABLE}; use tls::TlsHandshake; -use notification::{Notifications, Notification}; +use notification::{Notification, Notifications}; use params::{IntoConnectParams, User}; use priv_io::MessageStream; use rows::Rows; -use stmt::{Statement, Column}; -use transaction::{Transaction, IsolationLevel}; -use types::{IsNull, Kind, Type, Oid, ToSql, FromSql, Field, OID, NAME, CHAR}; +use stmt::{Column, Statement}; +use transaction::{IsolationLevel, Transaction}; +use types::{Field, FromSql, IsNull, Kind, Oid, ToSql, Type, CHAR, NAME, OID}; #[doc(inline)] pub use postgres_shared::CancelData; @@ -151,8 +151,8 @@ impl HandleNotice for F { pub struct LoggingNoticeHandler; impl HandleNotice for LoggingNoticeHandler { - fn handle_notice(&mut self, notice: DbError) { - info!("{}: {}", notice.severity, notice.message); + fn handle_notice(&mut self, _notice: DbError) { + info!("{}: {}", _notice.severity, _notice.message); } } @@ -302,9 +302,8 @@ impl InnerConnection { } let options = options.iter().map(|&(ref a, ref b)| (&**a, &**b)); - conn.stream.write_message( - |buf| frontend::startup_message(options, buf), - )?; + conn.stream + .write_message(|buf| frontend::startup_message(options, buf))?; conn.stream.flush()?; conn.handle_auth(user)?; @@ -336,10 +335,8 @@ impl InnerConnection { } } backend::Message::ParameterStatus(body) => { - self.parameters.insert( - body.name()?.to_owned(), - body.value()?.to_owned(), - ); + self.parameters + .insert(body.name()?.to_owned(), body.value()?.to_owned()); } val => return Ok(val), } @@ -359,10 +356,8 @@ impl InnerConnection { } } Some(backend::Message::ParameterStatus(body)) => { - self.parameters.insert( - body.name()?.to_owned(), - body.value()?.to_owned(), - ); + self.parameters + .insert(body.name()?.to_owned(), body.value()?.to_owned()); } val => return Ok(val), } @@ -381,10 +376,8 @@ impl InnerConnection { } } Some(backend::Message::ParameterStatus(body)) => { - self.parameters.insert( - body.name()?.to_owned(), - body.value()?.to_owned(), - ); + self.parameters + .insert(body.name()?.to_owned(), body.value()?.to_owned()); } val => return Ok(val), } @@ -413,9 +406,8 @@ impl InnerConnection { let pass = user.password().ok_or_else(|| { error::connect("a password was requested but not provided".into()) })?; - self.stream.write_message( - |buf| frontend::password_message(pass, buf), - )?; + self.stream + .write_message(|buf| frontend::password_message(pass, buf))?; self.stream.flush()?; } backend::Message::AuthenticationMd5Password(body) => { @@ -424,9 +416,8 @@ impl InnerConnection { })?; let output = authentication::md5_hash(user.name().as_bytes(), pass.as_bytes(), body.salt()); - self.stream.write_message( - |buf| frontend::password_message(&output, buf), - )?; + self.stream + .write_message(|buf| frontend::password_message(&output, buf))?; self.stream.flush()?; } backend::Message::AuthenticationSasl(body) => { @@ -436,8 +427,7 @@ impl InnerConnection { .count()? == 0 { return Err( - io::Error::new(io::ErrorKind::Other, "unsupported authentication") - .into(), + io::Error::new(io::ErrorKind::Other, "unsupported authentication").into(), ); } @@ -460,9 +450,8 @@ impl InnerConnection { scram.update(body.data())?; - self.stream.write_message(|buf| { - frontend::sasl_response(scram.message(), buf) - })?; + self.stream + .write_message(|buf| frontend::sasl_response(scram.message(), buf))?; self.stream.flush()?; let body = match self.read_message()? { @@ -473,10 +462,10 @@ impl InnerConnection { scram.finish(body.data())?; } - backend::Message::AuthenticationKerberosV5 | - backend::Message::AuthenticationScmCredential | - backend::Message::AuthenticationGss | - backend::Message::AuthenticationSspi => { + backend::Message::AuthenticationKerberosV5 + | backend::Message::AuthenticationScmCredential + | backend::Message::AuthenticationGss + | backend::Message::AuthenticationSspi => { return Err( io::Error::new(io::ErrorKind::Other, "unsupported authentication").into(), ) @@ -499,15 +488,12 @@ impl InnerConnection { fn raw_prepare(&mut self, stmt_name: &str, query: &str) -> Result<(Vec, Vec)> { debug!("preparing query with name `{}`: {}", stmt_name, query); - self.stream.write_message(|buf| { - frontend::parse(stmt_name, query, None, buf) - })?; - self.stream.write_message( - |buf| frontend::describe(b'S', stmt_name, buf), - )?; - self.stream.write_message( - |buf| Ok::<(), io::Error>(frontend::sync(buf)), - )?; + self.stream + .write_message(|buf| frontend::parse(stmt_name, query, None, buf))?; + self.stream + .write_message(|buf| frontend::describe(b'S', stmt_name, buf))?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))?; self.stream.flush()?; match self.read_message()? { @@ -539,16 +525,14 @@ impl InnerConnection { .collect()?; let columns = match raw_columns { - Some(body) => { - body.fields() - .and_then(|field| { - Ok(Column::new( - field.name().to_owned(), - self.get_type(field.type_oid())?, - )) - }) - .collect()? - } + Some(body) => body.fields() + .and_then(|field| { + Ok(Column::new( + field.name().to_owned(), + self.get_type(field.type_oid())?, + )) + }) + .collect()?, None => vec![], }; @@ -562,8 +546,7 @@ impl InnerConnection { let more_rows; loop { match self.read_message()? { - backend::Message::EmptyQueryResponse | - backend::Message::CommandComplete(_) => { + backend::Message::EmptyQueryResponse | backend::Message::CommandComplete(_) => { more_rows = false; break; } @@ -580,9 +563,8 @@ impl InnerConnection { self.stream.write_message(|buf| { frontend::copy_fail("COPY queries cannot be directly executed", buf) })?; - self.stream.write_message( - |buf| Ok::<(), io::Error>(frontend::sync(buf)), - )?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))?; self.stream.flush()?; } backend::Message::CopyOutResponse(_) => { @@ -595,7 +577,7 @@ impl InnerConnection { io::Error::new( io::ErrorKind::InvalidInput, "COPY queries cannot be directly \ - executed", + executed", ).into(), ); } @@ -654,12 +636,10 @@ impl InnerConnection { } } - self.stream.write_message(|buf| { - frontend::execute(portal_name, row_limit, buf) - })?; - self.stream.write_message( - |buf| Ok::<(), io::Error>(frontend::sync(buf)), - )?; + self.stream + .write_message(|buf| frontend::execute(portal_name, row_limit, buf))?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))?; self.stream.flush()?; match self.read_message()? { @@ -705,10 +685,8 @@ impl InnerConnection { param_types: param_types, columns: columns, }); - self.cached_statements.insert( - query.to_owned(), - info.clone(), - ); + self.cached_statements + .insert(query.to_owned(), info.clone()); info } }; @@ -717,12 +695,10 @@ impl InnerConnection { } fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> { - self.stream.write_message( - |buf| frontend::close(type_, name, buf), - )?; - self.stream.write_message( - |buf| Ok::<(), io::Error>(frontend::sync(buf)), - )?; + self.stream + .write_message(|buf| frontend::close(type_, name, buf))?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))?; self.stream.flush()?; let resp = match self.read_message()? { backend::Message::CloseComplete => Ok(()), @@ -755,13 +731,13 @@ impl InnerConnection { match self.raw_prepare( TYPEINFO_QUERY, "SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \ - t.typbasetype, n.nspname, t.typrelid \ - FROM pg_catalog.pg_type t \ - LEFT OUTER JOIN pg_catalog.pg_range r ON \ - r.rngtypid = t.oid \ - INNER JOIN pg_catalog.pg_namespace n ON \ - t.typnamespace = n.oid \ - WHERE t.oid = $1", + t.typbasetype, n.nspname, t.typrelid \ + FROM pg_catalog.pg_type t \ + LEFT OUTER JOIN pg_catalog.pg_range r ON \ + r.rngtypid = t.oid \ + INNER JOIN pg_catalog.pg_namespace n ON \ + t.typnamespace = n.oid \ + WHERE t.oid = $1", ) { Ok(..) => {} // Range types weren't added until Postgres 9.2, so pg_range may not exist @@ -769,11 +745,11 @@ impl InnerConnection { self.raw_prepare( TYPEINFO_QUERY, "SELECT t.typname, t.typtype, t.typelem, NULL::OID, \ - t.typbasetype, n.nspname, t.typrelid \ - FROM pg_catalog.pg_type t \ - INNER JOIN pg_catalog.pg_namespace n \ - ON t.typnamespace = n.oid \ - WHERE t.oid = $1", + t.typbasetype, n.nspname, t.typrelid \ + FROM pg_catalog.pg_type t \ + INNER JOIN pg_catalog.pg_namespace n \ + ON t.typnamespace = n.oid \ + WHERE t.oid = $1", )?; } Err(e) => return Err(e), @@ -793,27 +769,14 @@ impl InnerConnection { let get_raw = |i: usize| row.as_ref().and_then(|r| r.get(i)); let (name, type_, elem_oid, rngsubtype, basetype, schema, relid) = { - let name = String::from_sql_nullable(&NAME, get_raw(0)).map_err( - error::conversion, - )?; - let type_ = i8::from_sql_nullable(&CHAR, get_raw(1)).map_err( - error::conversion, - )?; - let elem_oid = Oid::from_sql_nullable(&OID, get_raw(2)).map_err( - error::conversion, - )?; - let rngsubtype = Option::::from_sql_nullable(&OID, get_raw(3)).map_err( - error::conversion, - )?; - let basetype = Oid::from_sql_nullable(&OID, get_raw(4)).map_err( - error::conversion, - )?; - let schema = String::from_sql_nullable(&NAME, get_raw(5)).map_err( - error::conversion, - )?; - let relid = Oid::from_sql_nullable(&OID, get_raw(6)).map_err( - error::conversion, - )?; + let name = String::from_sql_nullable(&NAME, get_raw(0)).map_err(error::conversion)?; + let type_ = i8::from_sql_nullable(&CHAR, get_raw(1)).map_err(error::conversion)?; + let elem_oid = Oid::from_sql_nullable(&OID, get_raw(2)).map_err(error::conversion)?; + let rngsubtype = + Option::::from_sql_nullable(&OID, get_raw(3)).map_err(error::conversion)?; + let basetype = Oid::from_sql_nullable(&OID, get_raw(4)).map_err(error::conversion)?; + let schema = String::from_sql_nullable(&NAME, get_raw(5)).map_err(error::conversion)?; + let relid = Oid::from_sql_nullable(&OID, get_raw(6)).map_err(error::conversion)?; (name, type_, elem_oid, rngsubtype, basetype, schema, relid) }; @@ -845,9 +808,9 @@ impl InnerConnection { match self.raw_prepare( TYPEINFO_ENUM_QUERY, "SELECT enumlabel \ - FROM pg_catalog.pg_enum \ - WHERE enumtypid = $1 \ - ORDER BY enumsortorder", + FROM pg_catalog.pg_enum \ + WHERE enumtypid = $1 \ + ORDER BY enumsortorder", ) { Ok(..) => {} // Postgres 9.0 doesn't have enumsortorder @@ -855,9 +818,9 @@ impl InnerConnection { self.raw_prepare( TYPEINFO_ENUM_QUERY, "SELECT enumlabel \ - FROM pg_catalog.pg_enum \ - WHERE enumtypid = $1 \ - ORDER BY oid", + FROM pg_catalog.pg_enum \ + WHERE enumtypid = $1 \ + ORDER BY oid", )?; } Err(e) => return Err(e), @@ -869,21 +832,14 @@ impl InnerConnection { fn read_enum_variants(&mut self, oid: Oid) -> Result> { self.setup_typeinfo_enum_query()?; - self.raw_execute( - TYPEINFO_ENUM_QUERY, - "", - 0, - &[OID], - &[&oid], - )?; + self.raw_execute(TYPEINFO_ENUM_QUERY, "", 0, &[OID], &[&oid])?; let mut rows = vec![]; self.read_rows(|row| rows.push(row))?; let mut variants = vec![]; for row in rows { - variants.push(String::from_sql_nullable(&NAME, row.get(0)).map_err( - error::conversion, - )?); + variants.push(String::from_sql_nullable(&NAME, row.get(0)) + .map_err(error::conversion)?); } Ok(variants) @@ -897,11 +853,11 @@ impl InnerConnection { self.raw_prepare( TYPEINFO_COMPOSITE_QUERY, "SELECT attname, atttypid \ - FROM pg_catalog.pg_attribute \ - WHERE attrelid = $1 \ - AND NOT attisdropped \ - AND attnum > 0 \ - ORDER BY attnum", + FROM pg_catalog.pg_attribute \ + WHERE attrelid = $1 \ + AND NOT attisdropped \ + AND attnum > 0 \ + ORDER BY attnum", )?; self.has_typeinfo_composite_query = true; @@ -910,25 +866,15 @@ impl InnerConnection { fn read_composite_fields(&mut self, relid: Oid) -> Result> { self.setup_typeinfo_composite_query()?; - self.raw_execute( - TYPEINFO_COMPOSITE_QUERY, - "", - 0, - &[OID], - &[&relid], - )?; + self.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", 0, &[OID], &[&relid])?; let mut rows = vec![]; self.read_rows(|row| rows.push(row))?; let mut fields = vec![]; for row in rows { let (name, type_) = { - let name = String::from_sql_nullable(&NAME, row.get(0)).map_err( - error::conversion, - )?; - let type_ = Oid::from_sql_nullable(&OID, row.get(1)).map_err( - error::conversion, - )?; + let name = String::from_sql_nullable(&NAME, row.get(0)).map_err(error::conversion)?; + let type_ = Oid::from_sql_nullable(&OID, row.get(1)).map_err(error::conversion)?; (name, type_) }; let type_ = self.get_type(type_)?; @@ -953,7 +899,8 @@ impl InnerConnection { fn quick_query(&mut self, query: &str) -> Result>>> { check_desync!(self); debug!("executing query: {}", query); - self.stream.write_message(|buf| frontend::query(query, buf))?; + self.stream + .write_message(|buf| frontend::query(query, buf))?; self.stream.flush()?; let mut result = vec![]; @@ -972,9 +919,8 @@ impl InnerConnection { self.stream.write_message(|buf| { frontend::copy_fail("COPY queries cannot be directly executed", buf) })?; - self.stream.write_message( - |buf| Ok::<(), io::Error>(frontend::sync(buf)), - )?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))?; self.stream.flush()?; } backend::Message::ErrorResponse(body) => { @@ -989,9 +935,8 @@ impl InnerConnection { fn finish_inner(&mut self) -> Result<()> { check_desync!(self); - self.stream.write_message(|buf| { - Ok::<(), io::Error>(frontend::terminate(buf)) - })?; + self.stream + .write_message(|buf| Ok::<(), io::Error>(frontend::terminate(buf)))?; self.stream.flush()?; Ok(()) }