Fix some warnings

This commit is contained in:
Steven Fackler 2018-01-09 21:15:35 -08:00
parent 863a295aae
commit bb3ebbf943
6 changed files with 137 additions and 156 deletions

View File

@ -4,7 +4,7 @@ use postgres_protocol::types;
use self::bit_vec::BitVec;
use std::error::Error;
use types::{FromSql, ToSql, IsNull, Type, BIT, VARBIT};
use types::{FromSql, IsNull, ToSql, Type, BIT, VARBIT};
impl FromSql for BitVec {
fn from_sql(_: &Type, raw: &[u8]) -> Result<BitVec, Box<Error + Sync + Send>> {
@ -21,7 +21,7 @@ impl FromSql for BitVec {
}
impl ToSql for BitVec {
fn to_sql(&self, _: &Type, mut out: &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>> {
fn to_sql(&self, _: &Type, out: &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>> {
types::varbit_to_sql(self.len(), self.to_bytes().into_iter(), out)?;
Ok(IsNull::No)
}

View File

@ -1,11 +1,11 @@
extern crate chrono;
use postgres_protocol::types;
use self::chrono::{Duration, NaiveDate, NaiveTime, NaiveDateTime, DateTime, Utc, Local,
FixedOffset};
use self::chrono::{DateTime, Duration, FixedOffset, Local, NaiveDate, NaiveDateTime, NaiveTime,
Utc};
use std::error::Error;
use types::{FromSql, ToSql, IsNull, Type, TIMESTAMP, TIMESTAMPTZ, DATE, TIME};
use types::{FromSql, IsNull, ToSql, Type, DATE, TIME, TIMESTAMP, TIMESTAMPTZ};
fn base() -> NaiveDateTime {
NaiveDate::from_ymd(2000, 1, 1).and_hms(0, 0, 0)
@ -62,11 +62,7 @@ impl FromSql for DateTime<Local> {
}
impl ToSql for DateTime<Local> {
fn to_sql(
&self,
type_: &Type,
mut w: &mut Vec<u8>,
) -> Result<IsNull, Box<Error + Sync + Send>> {
fn to_sql(&self, type_: &Type, w: &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>> {
self.with_timezone(&Utc).to_sql(type_, w)
}

View File

@ -4,7 +4,7 @@ use self::rustc_serialize::json;
use std::io::{Read, Write};
use std::error::Error;
use types::{FromSql, ToSql, IsNull, Type, JSON, JSONB};
use types::{FromSql, IsNull, ToSql, Type, JSON, JSONB};
impl FromSql for json::Json {
fn from_sql(ty: &Type, mut raw: &[u8]) -> Result<json::Json, Box<Error + Sync + Send>> {
@ -23,7 +23,7 @@ impl FromSql for json::Json {
}
impl ToSql for json::Json {
fn to_sql(&self, ty: &Type, mut out: &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>> {
fn to_sql(&self, ty: &Type, out: &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>> {
if *ty == JSONB {
out.push(1);
}

View File

@ -49,7 +49,7 @@ tokio-dns-unofficial = "0.1"
tokio-io = "0.1"
tokio-openssl = { version = "0.1", optional = true }
openssl = { version = "0.9", optional = true }
openssl = { version = "0.9.23", optional = true }
[target.'cfg(unix)'.dependencies]
tokio-uds = "0.1"

View File

@ -50,15 +50,14 @@
//! l.run(done).unwrap();
//! }
//! ```
#![doc(html_root_url="https://docs.rs/tokio-postgres/0.3")]
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.3")]
#![warn(missing_docs)]
extern crate bytes;
extern crate fallible_iterator;
extern crate futures_state_stream;
#[cfg_attr(test, macro_use)]
extern crate postgres_shared;
extern crate postgres_protocol;
extern crate postgres_shared;
extern crate tokio_core;
extern crate tokio_dns;
extern crate tokio_io;
@ -70,19 +69,19 @@ extern crate futures;
extern crate tokio_uds;
use fallible_iterator::FallibleIterator;
use futures::{Future, IntoFuture, Stream, Sink, Poll, StartSend, Async};
use futures::{Async, Future, IntoFuture, Poll, Sink, StartSend, Stream};
use futures::future::Either;
use futures_state_stream::{StreamEvent, StateStream, FutureExt};
use futures_state_stream::{FutureExt, StateStream, StreamEvent};
use postgres_protocol::authentication;
use postgres_protocol::message::{backend, frontend};
use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields};
use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody};
use postgres_shared::rows::RowData;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::sync::mpsc::{self, Sender, Receiver};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::mpsc::{self, Receiver, Sender};
use tokio_core::reactor::Handle;
#[doc(inline)]
@ -90,14 +89,14 @@ pub use postgres_shared::{error, params, types, CancelData, Notification};
#[doc(inline)]
pub use error::Error;
use error::{DbError, UNDEFINED_TABLE, UNDEFINED_COLUMN};
use error::{DbError, UNDEFINED_COLUMN, UNDEFINED_TABLE};
use params::{ConnectParams, IntoConnectParams};
use sink::SinkExt;
use stmt::{Statement, Column};
use stmt::{Column, Statement};
use stream::PostgresStream;
use tls::Handshake;
use transaction::Transaction;
use types::{Oid, Type, ToSql, IsNull, FromSql, Kind, Field, NAME, CHAR, OID};
use types::{Field, FromSql, IsNull, Kind, Oid, ToSql, Type, CHAR, NAME, OID};
use rows::Row;
#[macro_use]
@ -149,14 +148,12 @@ where
T: IntoConnectParams,
{
let params = match params.into_connect_params() {
Ok(params) => {
Either::A(stream::connect(
params.host().clone(),
params.port(),
tls_mode,
handle,
))
}
Ok(params) => Either::A(stream::connect(
params.host().clone(),
params.port(),
tls_mode,
handle,
)),
Err(e) => Either::B(Err(error::connect(e)).into_future()),
};
@ -188,10 +185,7 @@ impl InnerConnection {
fn read(
self,
) -> Box<
Future<
Item = (backend::Message, InnerConnection),
Error = (io::Error, InnerConnection),
>
Future<Item = (backend::Message, InnerConnection), Error = (io::Error, InnerConnection)>
+ Send,
> {
if self.desynchronized {
@ -307,12 +301,10 @@ impl Connection {
T: IntoConnectParams,
{
let fut = match params.into_connect_params() {
Ok(params) => {
Either::A(
stream::connect(params.host().clone(), params.port(), tls_mode, handle)
.map(|s| (s, params)),
)
}
Ok(params) => Either::A(
stream::connect(params.host().clone(), params.port(), tls_mode, handle)
.map(|s| (s, params)),
),
Err(e) => Either::B(Err(error::connect(e)).into_future()),
};
@ -384,35 +376,30 @@ impl Connection {
.map(|()| Some(buf))
.map_err(Into::into)
}
None => {
Err(error::connect(
"password was required but not provided".into(),
))
}
None => Err(error::connect(
"password was required but not provided".into(),
)),
}
}
backend::Message::AuthenticationMd5Password(body) => {
match params.user().and_then(
|u| u.password().map(|p| (u.name(), p)),
) {
Some((user, pass)) => {
let pass = authentication::md5_hash(
user.as_bytes(),
pass.as_bytes(),
body.salt(),
);
let mut buf = vec![];
frontend::password_message(&pass, &mut buf)
.map(|()| Some(buf))
.map_err(Into::into)
}
None => {
Err(error::connect(
"password was required but not provided".into(),
))
}
backend::Message::AuthenticationMd5Password(body) => match params
.user()
.and_then(|u| u.password().map(|p| (u.name(), p)))
{
Some((user, pass)) => {
let pass = authentication::md5_hash(
user.as_bytes(),
pass.as_bytes(),
body.salt(),
);
let mut buf = vec![];
frontend::password_message(&pass, &mut buf)
.map(|()| Some(buf))
.map_err(Into::into)
}
}
None => Err(error::connect(
"password was required but not provided".into(),
)),
},
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
_ => Err(bad_message()),
};
@ -489,18 +476,16 @@ impl Connection {
backend::Message::ReadyForQuery(_) => {
Ok((rows, Connection(s))).into_future().boxed2()
}
backend::Message::DataRow(body) => {
match RowData::new(body) {
Ok(row) => {
rows.push(row);
Connection(s).simple_read_rows(rows)
}
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
backend::Message::DataRow(body) => match RowData::new(body) {
Ok(row) => {
rows.push(row);
Connection(s).simple_read_rows(rows)
}
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) |
backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
Err(e) => Err((error::io(e), Connection(s))).into_future().boxed2(),
},
backend::Message::EmptyQueryResponse
| backend::Message::CommandComplete(_)
| backend::Message::RowDescription(_) => Connection(s).simple_read_rows(rows),
backend::Message::ErrorResponse(body) => Connection(s).ready_err(body),
_ => Err((bad_message(), Connection(s))).into_future().boxed2(),
})
@ -599,18 +584,22 @@ impl Connection {
self,
name: &str,
query: &str,
) -> Box<Future<Item = (Vec<Type>, Vec<Column>, Connection), Error = (Error, Connection)> + Send> {
) -> Box<Future<Item = (Vec<Type>, Vec<Column>, Connection), Error = (Error, Connection)> + Send>
{
let mut parse = vec![];
let mut describe = vec![];
let mut sync = vec![];
frontend::sync(&mut sync);
if let Err(e) = frontend::parse(name, query, None, &mut parse).and_then(|()| frontend::describe(b'S', name, &mut describe)) {
if let Err(e) = frontend::parse(name, query, None, &mut parse)
.and_then(|()| frontend::describe(b'S', name, &mut describe))
{
return Err((error::io(e), self)).into_future().boxed2();
}
let it = Some(parse).into_iter().chain(Some(describe)).chain(
Some(sync),
);
let it = Some(parse)
.into_iter()
.chain(Some(describe))
.chain(Some(sync));
self.0.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
.map_err(|(e, s, _)| (e, s))
.and_then(|s| s.0.read())
@ -809,25 +798,25 @@ impl Connection {
self.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \
t.typbasetype, n.nspname, t.typrelid \
FROM pg_catalog.pg_type t \
LEFT OUTER JOIN pg_catalog.pg_range r ON \
r.rngtypid = t.oid \
INNER JOIN pg_catalog.pg_namespace n ON \
t.typnamespace = n.oid \
WHERE t.oid = $1",
t.typbasetype, n.nspname, t.typrelid \
FROM pg_catalog.pg_type t \
LEFT OUTER JOIN pg_catalog.pg_range r ON \
r.rngtypid = t.oid \
INNER JOIN pg_catalog.pg_namespace n ON \
t.typnamespace = n.oid \
WHERE t.oid = $1",
).or_else(|(e, c)| {
// Range types weren't added until Postgres 9.2, so pg_range may not exist
if e.code() == Some(&UNDEFINED_TABLE) {
Either::A(c.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, \
NULL::OID, t.typbasetype, n.nspname, \
t.typrelid \
FROM pg_catalog.pg_type t \
INNER JOIN pg_catalog.pg_namespace n \
ON t.typnamespace = n.oid \
WHERE t.oid = $1",
NULL::OID, t.typbasetype, n.nspname, \
t.typrelid \
FROM pg_catalog.pg_type t \
INNER JOIN pg_catalog.pg_namespace n \
ON t.typnamespace = n.oid \
WHERE t.oid = $1",
))
} else {
Either::B(Err((e, c)).into_future())
@ -873,17 +862,19 @@ impl Connection {
self.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel \
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY enumsortorder",
).or_else(|(e, c)| if e.code() == Some(&UNDEFINED_COLUMN) {
Either::A(c.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel FROM pg_catalog.pg_enum WHERE \
enumtypid = $1 ORDER BY oid",
))
} else {
Either::B(Err((e, c)).into_future())
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY enumsortorder",
).or_else(|(e, c)| {
if e.code() == Some(&UNDEFINED_COLUMN) {
Either::A(c.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel FROM pg_catalog.pg_enum WHERE \
enumtypid = $1 ORDER BY oid",
))
} else {
Either::B(Err((e, c)).into_future())
}
})
.map(|(_, _, mut c)| {
c.0.has_typeinfo_enum_query = true;
@ -930,11 +921,11 @@ impl Connection {
self.raw_prepare(
TYPEINFO_COMPOSITE_QUERY,
"SELECT attname, atttypid \
FROM pg_catalog.pg_attribute \
WHERE attrelid = $1 \
AND NOT attisdropped \
AND attnum > 0 \
ORDER BY attnum",
FROM pg_catalog.pg_attribute \
WHERE attrelid = $1 \
AND NOT attisdropped \
AND attnum > 0 \
ORDER BY attnum",
).map(|(_, _, mut c)| {
c.0.has_typeinfo_composite_query = true;
c
@ -984,9 +975,10 @@ impl Connection {
Err(e) => Err((error::io(e), s)),
}).into_future()
.and_then(|s| {
let it = Some(bind).into_iter().chain(Some(execute)).chain(
Some(sync),
);
let it = Some(bind)
.into_iter()
.chain(Some(execute))
.chain(Some(sync));
s.0
.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
@ -1057,8 +1049,9 @@ impl Connection {
};
Either::A(r.into_future())
}
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) => Either::A(Ok((None, c)).into_future()),
backend::Message::EmptyQueryResponse | backend::Message::CommandComplete(_) => {
Either::A(Ok((None, c)).into_future())
}
backend::Message::ErrorResponse(body) => Either::B(c.ready_err(body)),
_ => Either::A(Err((bad_message(), c)).into_future()),
}
@ -1213,9 +1206,7 @@ trait BoxedStateStream<I, S, E> {
impl<T, I, S, E> BoxedStateStream<I, S, E> for T
where
T: StateStream<Item = I, State = S, Error = E>
+ Send
+ 'static,
T: StateStream<Item = I, State = S, Error = E> + Send + 'static,
{
fn boxed2(self) -> Box<StateStream<Item = I, State = S, Error = E> + Send> {
Box::new(self)

View File

@ -6,9 +6,9 @@ use std::time::Duration;
use tokio_core::reactor::{Core, Interval};
use super::*;
use error::{INVALID_PASSWORD, INVALID_AUTHORIZATION_SPECIFICATION, QUERY_CANCELED};
use error::{INVALID_AUTHORIZATION_SPECIFICATION, INVALID_PASSWORD, QUERY_CANCELED};
use params::{ConnectParams, Host};
use types::{ToSql, FromSql, Type, IsNull, Kind, BYTEA, TEXT, INT4, NUMERIC};
use types::{FromSql, INT4, IsNull, Kind, ToSql, Type, BYTEA, NUMERIC, TEXT};
#[test]
fn md5_user() {
@ -106,9 +106,8 @@ fn batch_execute_ok() {
TlsMode::None,
&l.handle(),
).then(|c| {
c.unwrap().batch_execute(
"CREATE TEMPORARY TABLE foo (id SERIAL);",
)
c.unwrap()
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL);")
});
l.run(done).unwrap();
}
@ -123,7 +122,7 @@ fn batch_execute_err() {
).then(|r| {
r.unwrap().batch_execute(
"CREATE TEMPORARY TABLE foo (id SERIAL); INSERT INTO foo DEFAULT \
VALUES;",
VALUES;",
)
})
.and_then(|c| c.batch_execute("SELECT * FROM bogo"))
@ -145,9 +144,8 @@ fn prepare_execute() {
TlsMode::None,
&l.handle(),
).then(|c| {
c.unwrap().prepare(
"CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)",
)
c.unwrap()
.prepare("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)")
})
.and_then(|(s, c)| c.execute(&s, &[]))
.and_then(|(n, c)| {
@ -206,22 +204,19 @@ fn transaction() {
TlsMode::None,
&l.handle(),
).then(|c| {
c.unwrap().batch_execute(
"CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);",
)
c.unwrap()
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);")
})
.then(|c| c.unwrap().transaction())
.then(|t| {
t.unwrap().batch_execute(
"INSERT INTO foo (name) VALUES ('joe');",
)
t.unwrap()
.batch_execute("INSERT INTO foo (name) VALUES ('joe');")
})
.then(|t| t.unwrap().rollback())
.then(|c| c.unwrap().transaction())
.then(|t| {
t.unwrap().batch_execute(
"INSERT INTO foo (name) VALUES ('bob');",
)
t.unwrap()
.batch_execute("INSERT INTO foo (name) VALUES ('bob');")
})
.then(|t| t.unwrap().commit())
.then(|c| c.unwrap().prepare("SELECT name FROM foo"))
@ -243,11 +238,9 @@ fn unix_socket() {
.and_then(|(s, c)| c.query(&s, &[]).collect())
.then(|r| {
let r = r.unwrap().0;
let params = ConnectParams::builder().user("postgres", None).build(
Host::Unix(
PathBuf::from(r[0].get::<String, _>(0)),
),
);
let params = ConnectParams::builder()
.user("postgres", None)
.build(Host::Unix(PathBuf::from(r[0].get::<String, _>(0))));
Connection::connect(params, TlsMode::None, &handle)
})
.then(|c| c.unwrap().batch_execute(""));
@ -274,14 +267,11 @@ fn ssl_user_ssl_required() {
#[cfg(feature = "with-openssl")]
#[test]
fn openssl_required() {
use tls::openssl::openssl::ssl::{SslMethod, SslConnectorBuilder};
use tls::openssl::openssl::ssl::{SslConnectorBuilder, SslMethod};
use tls::openssl::OpenSsl;
let mut builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
builder
.builder_mut()
.set_ca_file("../test/server.crt")
.unwrap();
builder.set_ca_file("../test/server.crt").unwrap();
let negotiator = OpenSsl::from(builder.build());
let mut l = Core::new().unwrap();
@ -401,9 +391,8 @@ fn enum_() {
let done = Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle)
.then(|c| {
c.unwrap().batch_execute(
"CREATE TYPE pg_temp.mood AS ENUM ('sad', 'ok', 'happy');",
)
c.unwrap()
.batch_execute("CREATE TYPE pg_temp.mood AS ENUM ('sad', 'ok', 'happy');")
})
.and_then(|c| c.prepare("SELECT $1::mood"))
.map(|(s, _)| {
@ -466,14 +455,19 @@ fn notifications() {
let done = Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle)
.then(|c| c.unwrap().batch_execute("LISTEN test_notifications"))
.and_then(|c1| {
Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle)
.then(|c2| {
Connection::connect("postgres://postgres@localhost:5433", TlsMode::None, &handle).then(
|c2| {
c2.unwrap()
.batch_execute("NOTIFY test_notifications, 'foo'")
.map(|_| c1)
})
},
)
})
.and_then(|c| {
c.notifications()
.into_future()
.map_err(|(e, n)| (e, n.into_inner()))
})
.and_then(|c| c.notifications().into_future().map_err(|(e, n)| (e, n.into_inner())))
.map(|(n, _)| {
let n = n.unwrap();
assert_eq!(n.channel, "test_notifications");