Port to new IO

This commit is contained in:
Steven Fackler 2015-02-26 09:02:32 -08:00
parent df4d5d8417
commit dac4c4f4f1
15 changed files with 391 additions and 250 deletions

View File

@ -25,15 +25,19 @@ phf_codegen = "0.6.11"
[dependencies] [dependencies]
phf = "0.6" phf = "0.6"
openssl = "0.4.0" openssl = "0.5"
time = "0.1.14" time = "0.1.14"
log = "0.2" log = "0.2"
rustc-serialize = "0.2" rustc-serialize = "0.3"
byteorder = "0.2" byteorder = "0.2.11"
[dependencies.uuid] [dependencies.uuid]
optional = true optional = true
version = "0.1" version = "0.1"
[dependencies.unix_socket]
optional = true
version = "0.1"
[dev-dependencies] [dev-dependencies]
url = "0.2" url = "0.2"

View File

@ -1,11 +1,16 @@
pub use ugh_privacy::DbError; pub use ugh_privacy::DbError;
use std::error; use std::option::Option::{self, Some, None};
use std::old_io::IoError; use std::result::Result::Ok;
use std::fmt; use std::clone::Clone;
use std::string::String;
use byteorder;
use openssl::ssl::error::SslError; use openssl::ssl::error::SslError;
use phf; use phf;
use std::error;
use std::fmt;
use std::io;
use Result; use Result;
use types::Type; use types::Type;
@ -31,7 +36,7 @@ pub enum ConnectError {
/// There was an error initializing the SSL session /// There was an error initializing the SSL session
SslError(SslError), SslError(SslError),
/// There was an error communicating with the server /// There was an error communicating with the server
IoError(IoError), IoError(io::Error),
/// The server sent an unexpected response /// The server sent an unexpected response
BadResponse, BadResponse,
} }
@ -73,8 +78,8 @@ impl error::Error for ConnectError {
} }
} }
impl error::FromError<IoError> for ConnectError { impl error::FromError<io::Error> for ConnectError {
fn from_error(err: IoError) -> ConnectError { fn from_error(err: io::Error) -> ConnectError {
ConnectError::IoError(err) ConnectError::IoError(err)
} }
} }
@ -91,6 +96,12 @@ impl error::FromError<SslError> for ConnectError {
} }
} }
impl error::FromError<byteorder::Error> for ConnectError {
fn from_error(err: byteorder::Error) -> ConnectError {
ConnectError::IoError(error::FromError::from_error(err))
}
}
/// Represents the position of an error in a query /// Represents the position of an error in a query
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
pub enum ErrorPosition { pub enum ErrorPosition {
@ -111,7 +122,7 @@ pub enum Error {
/// An error reported by the Postgres server /// An error reported by the Postgres server
DbError(DbError), DbError(DbError),
/// An error communicating with the Postgres server /// An error communicating with the Postgres server
IoError(IoError), IoError(io::Error),
/// The communication channel with the Postgres server has desynchronized /// The communication channel with the Postgres server has desynchronized
/// due to an earlier communications error. /// due to an earlier communications error.
StreamDesynchronized, StreamDesynchronized,
@ -166,8 +177,14 @@ impl error::FromError<DbError> for Error {
} }
} }
impl error::FromError<IoError> for Error { impl error::FromError<io::Error> for Error {
fn from_error(err: IoError) -> Error { fn from_error(err: io::Error) -> Error {
Error::IoError(err) Error::IoError(err)
} }
} }
impl error::FromError<byteorder::Error> for Error {
fn from_error(err: byteorder::Error) -> Error {
Error::IoError(error::FromError::from_error(err))
}
}

View File

@ -1,82 +1,67 @@
use std::option::Option::None;
use std::result::Result::{self, Ok, Err};
#[cfg(feature = "unix_socket")]
use std::clone::Clone;
use openssl::ssl::{SslStream, MaybeSslStream}; use openssl::ssl::{SslStream, MaybeSslStream};
use std::old_io::BufferedStream; use std::io;
use std::old_io::net::ip::Port; use std::io::prelude::*;
use std::old_io::net::tcp::TcpStream; use std::net::TcpStream;
use std::old_io::net::pipe::UnixStream; #[cfg(feature = "unix_socket")]
use std::old_io::{IoResult, Stream}; use unix_socket::UnixStream;
use byteorder::ReadBytesExt;
use {ConnectParams, SslMode, ConnectTarget, ConnectError}; use {ConnectParams, SslMode, ConnectTarget, ConnectError};
use message; use message;
use message::WriteMessage; use message::WriteMessage;
use message::FrontendMessage::SslRequest; use message::FrontendMessage::SslRequest;
const DEFAULT_PORT: Port = 5432; const DEFAULT_PORT: u16 = 5432;
#[doc(hidden)]
pub trait Timeout {
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
}
impl<S: Stream+Timeout> Timeout for MaybeSslStream<S> {
fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.get_mut().set_read_timeout(timeout_ms);
}
}
impl<S: Stream+Timeout> Timeout for BufferedStream<S> {
fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.get_mut().set_read_timeout(timeout_ms);
}
}
pub enum InternalStream { pub enum InternalStream {
Tcp(TcpStream), Tcp(TcpStream),
#[cfg(feature = "unix_socket")]
Unix(UnixStream), Unix(UnixStream),
} }
impl Reader for InternalStream { impl Read for InternalStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self { match *self {
InternalStream::Tcp(ref mut s) => s.read(buf), InternalStream::Tcp(ref mut s) => s.read(buf),
#[cfg(feature = "unix_socket")]
InternalStream::Unix(ref mut s) => s.read(buf), InternalStream::Unix(ref mut s) => s.read(buf),
} }
} }
} }
impl Writer for InternalStream { impl Write for InternalStream {
fn write_all(&mut self, buf: &[u8]) -> IoResult<()> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self { match *self {
InternalStream::Tcp(ref mut s) => s.write_all(buf), InternalStream::Tcp(ref mut s) => s.write(buf),
InternalStream::Unix(ref mut s) => s.write_all(buf), #[cfg(feature = "unix_socket")]
InternalStream::Unix(ref mut s) => s.write(buf),
} }
} }
fn flush(&mut self) -> IoResult<()> { fn flush(&mut self) -> io::Result<()> {
match *self { match *self {
InternalStream::Tcp(ref mut s) => s.flush(), InternalStream::Tcp(ref mut s) => s.flush(),
#[cfg(feature = "unix_socket")]
InternalStream::Unix(ref mut s) => s.flush(), InternalStream::Unix(ref mut s) => s.flush(),
} }
} }
} }
impl Timeout for InternalStream {
fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
match *self {
InternalStream::Tcp(ref mut s) => s.set_read_timeout(timeout_ms),
InternalStream::Unix(ref mut s) => s.set_read_timeout(timeout_ms),
}
}
}
fn open_socket(params: &ConnectParams) -> Result<InternalStream, ConnectError> { fn open_socket(params: &ConnectParams) -> Result<InternalStream, ConnectError> {
let port = params.port.unwrap_or(DEFAULT_PORT); let port = params.port.unwrap_or(DEFAULT_PORT);
match params.target { match params.target {
ConnectTarget::Tcp(ref host) => { ConnectTarget::Tcp(ref host) => {
Ok(try!(TcpStream::connect((&**host, port)).map(InternalStream::Tcp))) Ok(try!(TcpStream::connect(&(&**host, port)).map(InternalStream::Tcp)))
} }
#[cfg(feature = "unix_socket")]
ConnectTarget::Unix(ref path) => { ConnectTarget::Unix(ref path) => {
let mut path = path.clone(); let mut path = path.clone();
path.push(format!(".s.PGSQL.{}", port)); path.push(&format!(".s.PGSQL.{}", port));
Ok(try!(UnixStream::connect(&path).map(InternalStream::Unix))) Ok(try!(UnixStream::connect(&path).map(InternalStream::Unix)))
} }
} }

View File

@ -43,8 +43,10 @@
//! } //! }
//! ``` //! ```
#![doc(html_root_url="https://sfackler.github.io/rust-postgres/doc")] #![doc(html_root_url="https://sfackler.github.io/rust-postgres/doc")]
#![feature(unsafe_destructor, collections, old_io, io, core, old_path, std_misc)] #![feature(unsafe_destructor, collections, io, core, net)]
#![cfg_attr(feature = "unix_socket", feature(path))]
#![warn(missing_docs)] #![warn(missing_docs)]
#![no_implicit_prelude]
extern crate byteorder; extern crate byteorder;
#[macro_use] #[macro_use]
@ -53,43 +55,58 @@ extern crate openssl;
extern crate phf; extern crate phf;
extern crate "rustc-serialize" as serialize; extern crate "rustc-serialize" as serialize;
extern crate time; extern crate time;
#[cfg(feature = "unix_socket")]
extern crate unix_socket;
use std::option::Option::{self, Some, None};
use std::result::Result::{Ok, Err};
use std::boxed::Box;
use std::vec::Vec;
use std::clone::Clone;
use std::string::{String, ToString};
use std::ops::Drop;
use std::iter::{Iterator, DoubleEndedIterator, ExactSizeIterator, IteratorExt};
use std::prelude::v1::drop;
use std::marker::Send;
use std::slice::SliceExt;
use std::str::StrExt;
use openssl::crypto::hash::{self, Hasher}; use openssl::crypto::hash::{self, Hasher};
use openssl::ssl::{SslContext, MaybeSslStream}; use openssl::ssl::{SslContext, MaybeSslStream};
use serialize::hex::ToHex; use serialize::hex::ToHex;
use std::borrow::{ToOwned, Cow}; use std::borrow::{ToOwned, Cow};
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::cmp::max;
use std::collections::{VecDeque, HashMap}; use std::collections::{VecDeque, HashMap};
use std::fmt; use std::fmt;
use std::iter::{IntoIterator, RandomAccessIterator}; use std::iter::{IntoIterator, RandomAccessIterator};
use std::old_io::{BufferedStream, IoResult, IoError, IoErrorKind}; use std::io::{self, BufStream};
use std::old_io::net::ip::Port; use std::io::prelude::*;
use std::mem; use std::mem;
use std::slice; use std::slice;
use std::result; use std::result;
use std::time::Duration;
use std::vec; use std::vec;
use time::SteadyTime; use byteorder::{WriteBytesExt, BigEndian};
#[cfg(feature = "unix_socket")]
use std::path::PathBuf;
use url::Url;
pub use error::{Error, ConnectError, SqlState, DbError, ErrorPosition}; pub use error::{Error, ConnectError, SqlState, DbError, ErrorPosition};
#[doc(inline)] #[doc(inline)]
pub use types::{Oid, Type, Kind, ToSql, FromSql}; pub use types::{Oid, Type, Kind, ToSql, FromSql};
use types::IsNull; use types::IsNull;
#[doc(inline)] #[doc(inline)]
pub use types::Slice; pub use types::Slice;
use io::{InternalStream, Timeout}; use io_util::InternalStream;
use message::BackendMessage::*; use message::BackendMessage::*;
use message::FrontendMessage::*; use message::FrontendMessage::*;
use message::{FrontendMessage, BackendMessage, RowDescriptionEntry}; use message::{FrontendMessage, BackendMessage, RowDescriptionEntry};
use message::{WriteMessage, ReadMessage}; use message::{WriteMessage, ReadMessage};
use url::Url;
#[macro_use] #[macro_use]
mod macros; mod macros;
mod error; mod error;
mod io; mod io_util;
mod message; mod message;
mod ugh_privacy; mod ugh_privacy;
mod url; mod url;
@ -107,7 +124,10 @@ pub enum ConnectTarget {
/// Connect via TCP to the specified host. /// Connect via TCP to the specified host.
Tcp(String), Tcp(String),
/// Connect via a Unix domain socket in the specified directory. /// Connect via a Unix domain socket in the specified directory.
Unix(Path) ///
/// Only available on Unix platforms with the `unix_socket` feature.
#[cfg(feature = "unix_socket")]
Unix(PathBuf)
} }
/// Authentication information /// Authentication information
@ -127,7 +147,7 @@ pub struct ConnectParams {
/// The target port. /// The target port.
/// ///
/// Defaults to 5432 if not specified. /// Defaults to 5432 if not specified.
pub port: Option<Port>, pub port: Option<u16>,
/// The user to login as. /// The user to login as.
/// ///
/// `Connection::connect` requires a user but `cancel_query` does not. /// `Connection::connect` requires a user but `cancel_query` does not.
@ -169,9 +189,19 @@ impl IntoConnectParams for Url {
.. ..
} = self; } = self;
#[cfg(feature = "unix_socket")]
fn make_unix(maybe_path: String) -> result::Result<ConnectTarget, ConnectError> {
Ok(ConnectTarget::Unix(PathBuf::new(&maybe_path)))
}
#[cfg(not(feature = "unix_socket"))]
fn make_unix(_: String) -> result::Result<ConnectTarget, ConnectError> {
Err(ConnectError::InvalidUrl("unix socket support requires the `unix_socket` feature"
.to_string()))
}
let maybe_path = try!(url::decode_component(&host).map_err(ConnectError::InvalidUrl)); let maybe_path = try!(url::decode_component(&host).map_err(ConnectError::InvalidUrl));
let target = if maybe_path.starts_with("/") { let target = if maybe_path.starts_with("/") {
ConnectTarget::Unix(Path::new(maybe_path)) try!(make_unix(maybe_path))
} else { } else {
ConnectTarget::Tcp(host) ConnectTarget::Tcp(host)
}; };
@ -270,6 +300,7 @@ impl<'conn> Notifications<'conn> {
} }
} }
/*
/// Returns the oldest pending notification /// Returns the oldest pending notification
/// ///
/// If no notifications are pending, blocks for up to `timeout` time, after /// If no notifications are pending, blocks for up to `timeout` time, after
@ -323,6 +354,7 @@ impl<'conn> Notifications<'conn> {
} }
} }
} }
*/
} }
/// Contains information necessary to cancel queries for a session /// Contains information necessary to cancel queries for a session
@ -364,7 +396,7 @@ pub struct CancelData {
pub fn cancel_query<T>(params: T, ssl: &SslMode, data: CancelData) pub fn cancel_query<T>(params: T, ssl: &SslMode, data: CancelData)
-> result::Result<(), ConnectError> where T: IntoConnectParams { -> result::Result<(), ConnectError> where T: IntoConnectParams {
let params = try!(params.into_connect_params()); let params = try!(params.into_connect_params());
let mut socket = try!(io::initialize_stream(&params, ssl)); let mut socket = try!(io_util::initialize_stream(&params, ssl));
try!(socket.write_message(&CancelRequest { try!(socket.write_message(&CancelRequest {
code: message::CANCEL_CODE, code: message::CANCEL_CODE,
@ -384,7 +416,7 @@ struct CachedStatement {
} }
struct InnerConnection { struct InnerConnection {
stream: BufferedStream<MaybeSslStream<InternalStream>>, stream: BufStream<MaybeSslStream<InternalStream>>,
notice_handler: Box<HandleNotice>, notice_handler: Box<HandleNotice>,
notifications: VecDeque<Notification>, notifications: VecDeque<Notification>,
cancel_data: CancelData, cancel_data: CancelData,
@ -409,14 +441,14 @@ impl InnerConnection {
fn connect<T>(params: T, ssl: &SslMode) -> result::Result<InnerConnection, ConnectError> fn connect<T>(params: T, ssl: &SslMode) -> result::Result<InnerConnection, ConnectError>
where T: IntoConnectParams { where T: IntoConnectParams {
let params = try!(params.into_connect_params()); let params = try!(params.into_connect_params());
let stream = try!(io::initialize_stream(&params, ssl)); let stream = try!(io_util::initialize_stream(&params, ssl));
let ConnectParams { user, database, mut options, .. } = params; let ConnectParams { user, database, mut options, .. } = params;
let user = try!(user.ok_or(ConnectError::MissingUser)); let user = try!(user.ok_or(ConnectError::MissingUser));
let mut conn = InnerConnection { let mut conn = InnerConnection {
stream: BufferedStream::new(stream), stream: BufStream::new(stream),
next_stmt_id: 0, next_stmt_id: 0,
notice_handler: Box::new(LoggingNoticeHandler), notice_handler: Box::new(LoggingNoticeHandler),
notifications: VecDeque::new(), notifications: VecDeque::new(),
@ -488,7 +520,7 @@ impl InnerConnection {
} }
} }
fn write_messages(&mut self, messages: &[FrontendMessage]) -> IoResult<()> { fn write_messages(&mut self, messages: &[FrontendMessage]) -> io::Result<()> {
debug_assert!(!self.desynchronized); debug_assert!(!self.desynchronized);
for message in messages { for message in messages {
try_desync!(self, self.stream.write_message(message)); try_desync!(self, self.stream.write_message(message));
@ -496,7 +528,7 @@ impl InnerConnection {
Ok(try_desync!(self, self.stream.flush())) Ok(try_desync!(self, self.stream.flush()))
} }
fn read_one_message(&mut self) -> IoResult<Option<BackendMessage>> { fn read_one_message(&mut self) -> io::Result<Option<BackendMessage>> {
debug_assert!(!self.desynchronized); debug_assert!(!self.desynchronized);
match try_desync!(self, self.stream.read_message()) { match try_desync!(self, self.stream.read_message()) {
NoticeResponse { fields } => { NoticeResponse { fields } => {
@ -513,7 +545,7 @@ impl InnerConnection {
} }
} }
fn read_message_with_notification(&mut self) -> IoResult<BackendMessage> { fn read_message_with_notification(&mut self) -> io::Result<BackendMessage> {
loop { loop {
if let Some(msg) = try!(self.read_one_message()) { if let Some(msg) = try!(self.read_one_message()) {
return Ok(msg); return Ok(msg);
@ -521,7 +553,7 @@ impl InnerConnection {
} }
} }
fn read_message(&mut self) -> IoResult<BackendMessage> { fn read_message(&mut self) -> io::Result<BackendMessage> {
loop { loop {
match try!(self.read_message_with_notification()) { match try!(self.read_message_with_notification()) {
NotificationResponse { pid, channel, payload } => { NotificationResponse { pid, channel, payload } => {
@ -874,12 +906,13 @@ impl Connection {
/// (5432) is used if none is specified. The database name defaults to the /// (5432) is used if none is specified. The database name defaults to the
/// username if not specified. /// username if not specified.
/// ///
/// To connect to the server via Unix sockets, `host` should be set to the /// Connection via Unix sockets is supported with the `unix_sockets`
/// absolute path of the directory containing the socket file. Since `/` is /// feature. To connect to the server via Unix sockets, `host` should be
/// a reserved character in URLs, the path should be URL encoded. If the /// set to the absolute path of the directory containing the socket file.
/// path contains non-UTF 8 characters, a `ConnectParams` struct /// Since `/` is a reserved character in URLs, the path should be URL
/// should be created manually and passed in. Note that Postgres does not /// encoded. If the path contains non-UTF 8 characters, a `ConnectParams`
/// support SSL over Unix sockets. /// struct should be created manually and passed in. Note that Postgres
/// does not support SSL over Unix sockets.
/// ///
/// ## Examples /// ## Examples
/// ///
@ -902,6 +935,7 @@ impl Connection {
/// ```rust,no_run /// ```rust,no_run
/// # #![allow(unstable)] /// # #![allow(unstable)]
/// # use postgres::{Connection, UserInfo, ConnectParams, SslMode, ConnectTarget, ConnectError}; /// # use postgres::{Connection, UserInfo, ConnectParams, SslMode, ConnectTarget, ConnectError};
/// # #[cfg(feature = "unix_socket")]
/// # fn f() -> Result<(), ConnectError> { /// # fn f() -> Result<(), ConnectError> {
/// # let some_crazy_path = Path::new(""); /// # let some_crazy_path = Path::new("");
/// let params = ConnectParams { /// let params = ConnectParams {
@ -2054,11 +2088,11 @@ impl<'a> CopyInStatement<'a> {
let mut buf = vec![]; let mut buf = vec![];
let _ = buf.write_all(b"PGCOPY\n\xff\r\n\x00"); let _ = buf.write_all(b"PGCOPY\n\xff\r\n\x00");
let _ = buf.write_be_i32(0); let _ = buf.write_i32::<BigEndian>(0);
let _ = buf.write_be_i32(0); let _ = buf.write_i32::<BigEndian>(0);
'l: for mut row in rows { 'l: for mut row in rows {
let _ = buf.write_be_i16(self.column_types.len() as i16); let _ = buf.write_i16::<BigEndian>(self.column_types.len() as i16);
let mut types = self.column_types.iter(); let mut types = self.column_types.iter();
loop { loop {
@ -2067,10 +2101,10 @@ impl<'a> CopyInStatement<'a> {
let mut inner_buf = vec![]; let mut inner_buf = vec![];
match val.to_sql_checked(ty, &mut inner_buf) { match val.to_sql_checked(ty, &mut inner_buf) {
Ok(IsNull::Yes) => { Ok(IsNull::Yes) => {
let _ = buf.write_be_i32(-1); let _ = buf.write_i32::<BigEndian>(-1);
} }
Ok(IsNull::No) => { Ok(IsNull::No) => {
let _ = buf.write_be_i32(inner_buf.len() as i32); let _ = buf.write_i32::<BigEndian>(inner_buf.len() as i32);
let _ = buf.write_all(&inner_buf); let _ = buf.write_all(&inner_buf);
} }
Err(err) => { Err(err) => {
@ -2101,7 +2135,7 @@ impl<'a> CopyInStatement<'a> {
buf.clear(); buf.clear();
} }
let _ = buf.write_be_i16(-1); let _ = buf.write_i16::<BigEndian>(-1);
try!(conn.write_messages(&[ try!(conn.write_messages(&[
CopyData { CopyData {
data: &buf, data: &buf,

View File

@ -1,5 +1,3 @@
#![no_implicit_prelude]
use std::result::Result::{Ok, Err}; use std::result::Result::{Ok, Err};
use std::option::Option::{self, None, Some}; use std::option::Option::{self, None, Some};
use std::vec::Vec; use std::vec::Vec;
@ -7,13 +5,13 @@ use std::string::String;
use std::str::StrExt; use std::str::StrExt;
use std::slice::SliceExt; use std::slice::SliceExt;
use std::old_io::{self, IoResult, IoError, OtherIoError, ByRefReader, Buffer}; use std::io;
use std::old_io::util::LimitReader; use std::io::prelude::*;
use std::mem; use std::mem;
use byteorder::{BigEndian, ReaderBytesExt, WriterBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use io::Timeout;
use types::Oid; use types::Oid;
use util;
use self::BackendMessage::*; use self::BackendMessage::*;
use self::FrontendMessage::*; use self::FrontendMessage::*;
@ -144,23 +142,23 @@ pub enum FrontendMessage<'a> {
#[doc(hidden)] #[doc(hidden)]
trait WriteCStr { trait WriteCStr {
fn write_cstr(&mut self, s: &str) -> IoResult<()>; fn write_cstr(&mut self, s: &str) -> io::Result<()>;
} }
impl<W: old_io::Writer> WriteCStr for W { impl<W: Write> WriteCStr for W {
fn write_cstr(&mut self, s: &str) -> IoResult<()> { fn write_cstr(&mut self, s: &str) -> io::Result<()> {
try!(self.write_all(s.as_bytes())); try!(self.write_all(s.as_bytes()));
self.write_u8(0) Ok(try!(self.write_u8(0)))
} }
} }
#[doc(hidden)] #[doc(hidden)]
pub trait WriteMessage { pub trait WriteMessage {
fn write_message(&mut self, &FrontendMessage) -> IoResult<()> ; fn write_message(&mut self, &FrontendMessage) -> io::Result<()> ;
} }
impl<W: old_io::Writer> WriteMessage for W { impl<W: Write> WriteMessage for W {
fn write_message(&mut self, message: &FrontendMessage) -> IoResult<()> { fn write_message(&mut self, message: &FrontendMessage) -> io::Result<()> {
let mut buf = vec![]; let mut buf = vec![];
let mut ident = None; let mut ident = None;
@ -181,7 +179,7 @@ impl<W: old_io::Writer> WriteMessage for W {
None => try!(buf.write_i32::<BigEndian>(-1)), None => try!(buf.write_i32::<BigEndian>(-1)),
Some(ref value) => { Some(ref value) => {
try!(buf.write_i32::<BigEndian>(value.len() as i32)); try!(buf.write_i32::<BigEndian>(value.len() as i32));
try!(old_io::Writer::write_all(&mut buf, &**value)); try!(buf.write_all(&**value));
} }
} }
} }
@ -203,7 +201,7 @@ impl<W: old_io::Writer> WriteMessage for W {
} }
CopyData { data } => { CopyData { data } => {
ident = Some(b'd'); ident = Some(b'd');
try!(old_io::Writer::write_all(&mut buf, data)); try!(buf.write_all(data));
} }
CopyDone => ident = Some(b'c'), CopyDone => ident = Some(b'c'),
CopyFail { message } => { CopyFail { message } => {
@ -264,40 +262,32 @@ impl<W: old_io::Writer> WriteMessage for W {
#[doc(hidden)] #[doc(hidden)]
trait ReadCStr { trait ReadCStr {
fn read_cstr(&mut self) -> IoResult<String>; fn read_cstr(&mut self) -> io::Result<String>;
} }
impl<R: Buffer> ReadCStr for R { impl<R: BufRead> ReadCStr for R {
fn read_cstr(&mut self) -> IoResult<String> { fn read_cstr(&mut self) -> io::Result<String> {
let mut buf = try!(self.read_until(0)); let mut buf = vec![];
try!(self.read_until(0, &mut buf));
buf.pop(); buf.pop();
String::from_utf8(buf).map_err(|_| IoError { String::from_utf8(buf).map_err(|_| io::Error::new(io::ErrorKind::Other,
kind: OtherIoError, "received a non-utf8 string from server",
desc: "Received a non-utf8 string from server", None))
detail: None
})
} }
} }
#[doc(hidden)] #[doc(hidden)]
pub trait ReadMessage { pub trait ReadMessage {
fn read_message(&mut self) -> IoResult<BackendMessage>; fn read_message(&mut self) -> io::Result<BackendMessage>;
} }
impl<R: Buffer+Timeout> ReadMessage for R { impl<R: BufRead> ReadMessage for R {
fn read_message(&mut self) -> IoResult<BackendMessage> { fn read_message(&mut self) -> io::Result<BackendMessage> {
// The first byte read is a bit complex to make let ident = try!(self.read_u8());
// Notifications::next_block_for work.
let ident = self.read_u8();
// At this point we've got to turn off any read timeout to prevent
// stream desynchronization. We're assuming that if we've got the first
// byte, there's more stuff to follow.
self.set_read_timeout(None);
let ident = try!(ident);
// subtract size of length value // subtract size of length value
let len = try!(self.read_u32::<BigEndian>()) as usize - mem::size_of::<i32>(); let len = try!(self.read_u32::<BigEndian>()) as usize - mem::size_of::<i32>();
let mut rdr = LimitReader::new(self.by_ref(), len); let mut rdr = self.by_ref().take(len as u64);
let ret = match ident { let ret = match ident {
b'1' => ParseComplete, b'1' => ParseComplete,
@ -338,24 +328,20 @@ impl<R: Buffer+Timeout> ReadMessage for R {
b't' => try!(read_parameter_description(&mut rdr)), b't' => try!(read_parameter_description(&mut rdr)),
b'T' => try!(read_row_description(&mut rdr)), b'T' => try!(read_row_description(&mut rdr)),
b'Z' => ReadyForQuery { _state: try!(rdr.read_u8()) }, b'Z' => ReadyForQuery { _state: try!(rdr.read_u8()) },
ident => return Err(IoError { ident => {
kind: OtherIoError, return Err(io::Error::new(io::ErrorKind::Other,
desc: "Unexpected message tag", "unexpected message tag",
detail: Some(format!("got {}", ident)), Some(format!("got {}", ident))))
}) }
}; };
if rdr.limit() != 0 { if rdr.limit() != 0 {
return Err(IoError { return Err(io::Error::new(io::ErrorKind::Other, "didn't read entire message", None));
kind: OtherIoError,
desc: "didn't read entire message",
detail: None,
});
} }
Ok(ret) Ok(ret)
} }
} }
fn read_fields<R: Buffer>(buf: &mut R) -> IoResult<Vec<(u8, String)>> { fn read_fields<R: BufRead>(buf: &mut R) -> io::Result<Vec<(u8, String)>> {
let mut fields = vec![]; let mut fields = vec![];
loop { loop {
let ty = try!(buf.read_u8()); let ty = try!(buf.read_u8());
@ -369,14 +355,18 @@ fn read_fields<R: Buffer>(buf: &mut R) -> IoResult<Vec<(u8, String)>> {
Ok(fields) Ok(fields)
} }
fn read_data_row<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage> { fn read_data_row<R: BufRead>(buf: &mut R) -> io::Result<BackendMessage> {
let len = try!(buf.read_be_u16()) as usize; let len = try!(buf.read_u16::<BigEndian>()) as usize;
let mut values = Vec::with_capacity(len); let mut values = Vec::with_capacity(len);
for _ in 0..len { for _ in 0..len {
let val = match try!(buf.read_be_i32()) { let val = match try!(buf.read_i32::<BigEndian>()) {
-1 => None, -1 => None,
len => Some(try!(buf.read_exact(len as usize))) len => {
let mut data = vec![];
try!(buf.take(len as u64).read_to_end(&mut data));
Some(data)
}
}; };
values.push(val); values.push(val);
} }
@ -384,29 +374,29 @@ fn read_data_row<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage> {
Ok(DataRow { row: values }) Ok(DataRow { row: values })
} }
fn read_auth_message<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage> { fn read_auth_message<R: Read>(buf: &mut R) -> io::Result<BackendMessage> {
Ok(match try!(buf.read_be_i32()) { Ok(match try!(buf.read_i32::<BigEndian>()) {
0 => AuthenticationOk, 0 => AuthenticationOk,
2 => AuthenticationKerberosV5, 2 => AuthenticationKerberosV5,
3 => AuthenticationCleartextPassword, 3 => AuthenticationCleartextPassword,
5 => { 5 => {
let mut salt = [0; 4]; let mut salt = [0; 4];
try!(buf.read_at_least(salt.len(), &mut salt)); try!(util::read_all(buf, &mut salt));
AuthenticationMD5Password { salt: salt } AuthenticationMD5Password { salt: salt }
}, },
6 => AuthenticationSCMCredential, 6 => AuthenticationSCMCredential,
7 => AuthenticationGSS, 7 => AuthenticationGSS,
9 => AuthenticationSSPI, 9 => AuthenticationSSPI,
val => return Err(IoError { val => {
kind: OtherIoError, return Err(io::Error::new(io::ErrorKind::Other,
desc: "Unexpected authentication tag", "unexpected authentication tag",
detail: Some(format!("got {}", val)), Some(format!("got {}", val))));
}) }
}) })
} }
fn read_parameter_description<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage> { fn read_parameter_description<R: Read>(buf: &mut R) -> io::Result<BackendMessage> {
let len = try!(buf.read_be_i16()) as usize; let len = try!(buf.read_i16::<BigEndian>()) as usize;
let mut types = Vec::with_capacity(len); let mut types = Vec::with_capacity(len);
for _ in 0..len { for _ in 0..len {
@ -416,19 +406,19 @@ fn read_parameter_description<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage
Ok(ParameterDescription { types: types }) Ok(ParameterDescription { types: types })
} }
fn read_row_description<R: Buffer>(buf: &mut R) -> IoResult<BackendMessage> { fn read_row_description<R: BufRead>(buf: &mut R) -> io::Result<BackendMessage> {
let len = try!(buf.read_be_i16()) as usize; let len = try!(buf.read_i16::<BigEndian>()) as usize;
let mut types = Vec::with_capacity(len); let mut types = Vec::with_capacity(len);
for _ in 0..len { for _ in 0..len {
types.push(RowDescriptionEntry { types.push(RowDescriptionEntry {
name: try!(buf.read_cstr()), name: try!(buf.read_cstr()),
table_oid: try!(buf.read_u32::<BigEndian>()), table_oid: try!(buf.read_u32::<BigEndian>()),
column_id: try!(buf.read_be_i16()), column_id: try!(buf.read_i16::<BigEndian>()),
type_oid: try!(buf.read_u32::<BigEndian>()), type_oid: try!(buf.read_u32::<BigEndian>()),
type_size: try!(buf.read_be_i16()), type_size: try!(buf.read_i16::<BigEndian>()),
type_modifier: try!(buf.read_be_i32()), type_modifier: try!(buf.read_i32::<BigEndian>()),
format: try!(buf.read_be_i16()) format: try!(buf.read_i16::<BigEndian>())
}) })
} }

View File

@ -1,10 +1,16 @@
use std::marker::Sized;
use std::result::Result::{Ok, Err};
use std::clone::Clone;
use serialize::json; use serialize::json;
use std::io::prelude::*;
use byteorder::{ReadBytesExt, WriteBytesExt};
use {Result, Error}; use {Result, Error};
use types::{FromSql, ToSql, IsNull, Type}; use types::{FromSql, ToSql, IsNull, Type};
impl FromSql for json::Json { impl FromSql for json::Json {
fn from_sql<R: Reader>(ty: &Type, raw: &mut R) -> Result<json::Json> { fn from_sql<R: Read>(ty: &Type, raw: &mut R) -> Result<json::Json> {
if let Type::Jsonb = *ty { if let Type::Jsonb = *ty {
// We only support version 1 of the jsonb binary format // We only support version 1 of the jsonb binary format
if try!(raw.read_u8()) != 1 { if try!(raw.read_u8()) != 1 {
@ -18,7 +24,7 @@ impl FromSql for json::Json {
} }
impl ToSql for json::Json { impl ToSql for json::Json {
fn to_sql<W: Writer+?Sized>(&self, ty: &Type, out: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, ty: &Type, mut out: &mut W) -> Result<IsNull> {
if let Type::Jsonb = *ty { if let Type::Jsonb = *ty {
try!(out.write_u8(1)); try!(out.write_u8(1));
} }

View File

@ -1,9 +1,21 @@
//! Traits dealing with Postgres data types //! Traits dealing with Postgres data types
pub use self::slice::Slice; pub use self::slice::Slice;
use std::option::Option::{self, Some, None};
use std::result::Result::{Ok, Err};
use std::boxed::Box;
use std::vec::Vec;
use std::clone::Clone;
use std::string::String;
use std::iter::ExactSizeIterator;
use std::marker::Sized;
use std::str::StrExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::old_io::net::ip::IpAddr; use std::net::IpAddr;
use std::fmt; use std::fmt;
use std::io::prelude::*;
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
use Result; use Result;
use error::Error; use error::Error;
@ -25,7 +37,7 @@ macro_rules! accepts {
#[macro_export] #[macro_export]
macro_rules! to_sql_checked { macro_rules! to_sql_checked {
() => { () => {
fn to_sql_checked(&self, ty: &Type, out: &mut Writer) -> Result<IsNull> { fn to_sql_checked(&self, ty: &Type, out: &mut Write) -> Result<IsNull> {
if !<Self as ToSql>::accepts(ty) { if !<Self as ToSql>::accepts(ty) {
return Err($crate::Error::WrongType(ty.clone())); return Err($crate::Error::WrongType(ty.clone()));
} }
@ -440,9 +452,9 @@ make_postgres_type! {
/// A trait for types that can be created from a Postgres value. /// A trait for types that can be created from a Postgres value.
pub trait FromSql: Sized { pub trait FromSql: Sized {
/// Creates a new value of this type from a `Reader` of Postgres data. /// Creates a new value of this type from a `Read` of Postgres data.
/// ///
/// If the value was `NULL`, the `Reader` will be `None`. /// If the value was `NULL`, the `Read` will be `None`.
/// ///
/// The caller of this method is responsible for ensuring that this type /// The caller of this method is responsible for ensuring that this type
/// is compatible with the Postgres `Type`. /// is compatible with the Postgres `Type`.
@ -450,19 +462,19 @@ pub trait FromSql: Sized {
/// The default implementation calls `FromSql::from_sql` when `raw` is /// The default implementation calls `FromSql::from_sql` when `raw` is
/// `Some` and returns `Err(Error::WasNull)` when `raw` is `None`. It does /// `Some` and returns `Err(Error::WasNull)` when `raw` is `None`. It does
/// not typically need to be overridden. /// not typically need to be overridden.
fn from_sql_nullable<R: Reader>(ty: &Type, raw: Option<&mut R>) -> Result<Self> { fn from_sql_nullable<R: Read>(ty: &Type, raw: Option<&mut R>) -> Result<Self> {
match raw { match raw {
Some(raw) => FromSql::from_sql(ty, raw), Some(raw) => FromSql::from_sql(ty, raw),
None => Err(Error::WasNull), None => Err(Error::WasNull),
} }
} }
/// Creates a new value of this type from a `Reader` of the binary format /// Creates a new value of this type from a `Read` of the binary format
/// of the specified Postgres `Type`. /// of the specified Postgres `Type`.
/// ///
/// The caller of this method is responsible for ensuring that this type /// The caller of this method is responsible for ensuring that this type
/// is compatible with the Postgres `Type`. /// is compatible with the Postgres `Type`.
fn from_sql<R: Reader>(ty: &Type, raw: &mut R) -> Result<Self>; fn from_sql<R: Read>(ty: &Type, raw: &mut R) -> Result<Self>;
/// Determines if a value of this type can be created from the specified /// Determines if a value of this type can be created from the specified
/// Postgres `Type`. /// Postgres `Type`.
@ -470,14 +482,14 @@ pub trait FromSql: Sized {
} }
impl<T: FromSql> FromSql for Option<T> { impl<T: FromSql> FromSql for Option<T> {
fn from_sql_nullable<R: Reader>(ty: &Type, raw: Option<&mut R>) -> Result<Option<T>> { fn from_sql_nullable<R: Read>(ty: &Type, raw: Option<&mut R>) -> Result<Option<T>> {
match raw { match raw {
Some(raw) => <T as FromSql>::from_sql(ty, raw).map(|e| Some(e)), Some(raw) => <T as FromSql>::from_sql(ty, raw).map(|e| Some(e)),
None => Ok(None), None => Ok(None),
} }
} }
fn from_sql<R: Reader>(ty: &Type, raw: &mut R) -> Result<Option<T>> { fn from_sql<R: Read>(ty: &Type, raw: &mut R) -> Result<Option<T>> {
<T as FromSql>::from_sql(ty, raw).map(|e| Some(e)) <T as FromSql>::from_sql(ty, raw).map(|e| Some(e))
} }
@ -487,7 +499,7 @@ impl<T: FromSql> FromSql for Option<T> {
} }
impl FromSql for bool { impl FromSql for bool {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<bool> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<bool> {
Ok(try!(raw.read_u8()) != 0) Ok(try!(raw.read_u8()) != 0)
} }
@ -495,16 +507,20 @@ impl FromSql for bool {
} }
impl FromSql for Vec<u8> { impl FromSql for Vec<u8> {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<Vec<u8>> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<Vec<u8>> {
Ok(try!(raw.read_to_end())) let mut buf = vec![];
try!(raw.read_to_end(&mut buf));
Ok(buf)
} }
accepts!(Type::Bytea); accepts!(Type::Bytea);
} }
impl FromSql for String { impl FromSql for String {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<String> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<String> {
String::from_utf8(try!(raw.read_to_end())).map_err(|_| Error::BadResponse) let mut buf = vec![];
try!(raw.read_to_end(&mut buf));
String::from_utf8(buf).map_err(|_| Error::BadResponse)
} }
fn accepts(ty: &Type) -> bool { fn accepts(ty: &Type) -> bool {
@ -519,8 +535,8 @@ impl FromSql for String {
macro_rules! primitive_from { macro_rules! primitive_from {
($t:ty, $f:ident, $($expected:pat),+) => { ($t:ty, $f:ident, $($expected:pat),+) => {
impl FromSql for $t { impl FromSql for $t {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<$t> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<$t> {
Ok(try!(raw.$f())) Ok(try!(raw.$f::<BigEndian>()))
} }
accepts!($($expected),+); accepts!($($expected),+);
@ -528,16 +544,23 @@ macro_rules! primitive_from {
} }
} }
primitive_from!(i8, read_i8, Type::Char); impl FromSql for i8 {
primitive_from!(i16, read_be_i16, Type::Int2); fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<i8> {
primitive_from!(i32, read_be_i32, Type::Int4); Ok(try!(raw.read_i8()))
primitive_from!(u32, read_be_u32, Type::Oid); }
primitive_from!(i64, read_be_i64, Type::Int8);
primitive_from!(f32, read_be_f32, Type::Float4); accepts!(Type::Char);
primitive_from!(f64, read_be_f64, Type::Float8); }
primitive_from!(i16, read_i16, Type::Int2);
primitive_from!(i32, read_i32, Type::Int4);
primitive_from!(u32, read_u32, Type::Oid);
primitive_from!(i64, read_i64, Type::Int8);
primitive_from!(f32, read_f32, Type::Float4);
primitive_from!(f64, read_f64, Type::Float8);
impl FromSql for IpAddr { impl FromSql for IpAddr {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<IpAddr> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<IpAddr> {
let family = try!(raw.read_u8()); let family = try!(raw.read_u8());
let _bits = try!(raw.read_u8()); let _bits = try!(raw.read_u8());
let _is_cidr = try!(raw.read_u8()); let _is_cidr = try!(raw.read_u8());
@ -545,20 +568,24 @@ impl FromSql for IpAddr {
if nb > 16 { if nb > 16 {
return Err(Error::BadResponse); return Err(Error::BadResponse);
} }
let mut buf = [0u8; 16];
try!(raw.read_at_least(nb as usize, &mut buf));
let mut buf: &[u8] = &buf;
match family { match family {
2 if nb == 4 => Ok(IpAddr::Ipv4Addr(buf[0], buf[1], buf[2], buf[3])), 2 if nb == 4 => {
3 if nb == 16 => Ok(IpAddr::Ipv6Addr(try!(buf.read_be_u16()), Ok(IpAddr::new_v4(try!(raw.read_u8()),
try!(buf.read_be_u16()), try!(raw.read_u8()),
try!(buf.read_be_u16()), try!(raw.read_u8()),
try!(buf.read_be_u16()), try!(raw.read_u8())))
try!(buf.read_be_u16()), }
try!(buf.read_be_u16()), 3 if nb == 16 => {
try!(buf.read_be_u16()), Ok(IpAddr::new_v6(try!(raw.read_u16::<BigEndian>()),
try!(buf.read_be_u16()))), try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>()),
try!(raw.read_u16::<BigEndian>())))
}
_ => Err(Error::BadResponse), _ => Err(Error::BadResponse),
} }
} }
@ -567,25 +594,27 @@ impl FromSql for IpAddr {
} }
impl FromSql for HashMap<String, Option<String>> { impl FromSql for HashMap<String, Option<String>> {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) fn from_sql<R: Read>(_: &Type, raw: &mut R)
-> Result<HashMap<String, Option<String>>> { -> Result<HashMap<String, Option<String>>> {
let mut map = HashMap::new(); let mut map = HashMap::new();
let count = try!(raw.read_be_i32()); let count = try!(raw.read_i32::<BigEndian>());
for _ in range(0, count) { for _ in 0..count {
let key_len = try!(raw.read_be_i32()); let key_len = try!(raw.read_i32::<BigEndian>());
let key = try!(raw.read_exact(key_len as usize)); let mut key = vec![];
try!(raw.take(key_len as u64).read_to_end(&mut key));
let key = match String::from_utf8(key) { let key = match String::from_utf8(key) {
Ok(key) => key, Ok(key) => key,
Err(_) => return Err(Error::BadResponse), Err(_) => return Err(Error::BadResponse),
}; };
let val_len = try!(raw.read_be_i32()); let val_len = try!(raw.read_i32::<BigEndian>());
let val = if val_len < 0 { let val = if val_len < 0 {
None None
} else { } else {
let val = try!(raw.read_exact(val_len as usize)); let mut val = vec![];
try!(raw.take(val_len as u64).read_to_end(&mut val));
match String::from_utf8(val) { match String::from_utf8(val) {
Ok(val) => Some(val), Ok(val) => Some(val),
Err(_) => return Err(Error::BadResponse), Err(_) => return Err(Error::BadResponse),
@ -626,7 +655,7 @@ pub trait ToSql {
/// `NULL`. If this is the case, implementations **must not** write /// `NULL`. If this is the case, implementations **must not** write
/// anything to `out`. /// anything to `out`.
fn to_sql<W: ?Sized>(&self, ty: &Type, out: &mut W) -> Result<IsNull> fn to_sql<W: ?Sized>(&self, ty: &Type, out: &mut W) -> Result<IsNull>
where Self: Sized, W: Writer; where Self: Sized, W: Write;
/// Determines if a value of this type can be converted to the specified /// Determines if a value of this type can be converted to the specified
/// Postgres `Type`. /// Postgres `Type`.
@ -636,13 +665,13 @@ pub trait ToSql {
/// ///
/// *All* implementations of this method should be generated by the /// *All* implementations of this method should be generated by the
/// `to_sql_checked!()` macro. /// `to_sql_checked!()` macro.
fn to_sql_checked(&self, ty: &Type, out: &mut Writer) -> Result<IsNull>; fn to_sql_checked(&self, ty: &Type, out: &mut Write) -> Result<IsNull>;
} }
impl<T: ToSql> ToSql for Option<T> { impl<T: ToSql> ToSql for Option<T> {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, ty: &Type, out: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, ty: &Type, out: &mut W) -> Result<IsNull> {
match *self { match *self {
Some(ref val) => val.to_sql(ty, out), Some(ref val) => val.to_sql(ty, out),
None => Ok(IsNull::Yes), None => Ok(IsNull::Yes),
@ -657,7 +686,7 @@ impl<T: ToSql> ToSql for Option<T> {
impl ToSql for bool { impl ToSql for bool {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
try!(w.write_u8(*self as u8)); try!(w.write_u8(*self as u8));
Ok(IsNull::No) Ok(IsNull::No)
} }
@ -668,7 +697,7 @@ impl ToSql for bool {
impl<'a> ToSql for &'a [u8] { impl<'a> ToSql for &'a [u8] {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> {
try!(w.write_all(*self)); try!(w.write_all(*self));
Ok(IsNull::No) Ok(IsNull::No)
} }
@ -679,7 +708,7 @@ impl<'a> ToSql for &'a [u8] {
impl ToSql for Vec<u8> { impl ToSql for Vec<u8> {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, ty: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, ty: &Type, w: &mut W) -> Result<IsNull> {
(&**self).to_sql(ty, w) (&**self).to_sql(ty, w)
} }
@ -691,7 +720,7 @@ impl ToSql for Vec<u8> {
impl<'a> ToSql for &'a str { impl<'a> ToSql for &'a str {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> {
try!(w.write_all(self.as_bytes())); try!(w.write_all(self.as_bytes()));
Ok(IsNull::No) Ok(IsNull::No)
} }
@ -708,7 +737,7 @@ impl<'a> ToSql for &'a str {
impl ToSql for String { impl ToSql for String {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, ty: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, ty: &Type, w: &mut W) -> Result<IsNull> {
(&**self).to_sql(ty, w) (&**self).to_sql(ty, w)
} }
@ -722,8 +751,8 @@ macro_rules! to_primitive {
impl ToSql for $t { impl ToSql for $t {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
try!(w.$f(*self)); try!(w.$f::<BigEndian>(*self));
Ok(IsNull::No) Ok(IsNull::No)
} }
@ -732,20 +761,31 @@ macro_rules! to_primitive {
} }
} }
to_primitive!(i8, write_i8, Type::Char); impl ToSql for i8 {
to_primitive!(i16, write_be_i16, Type::Int2); to_sql_checked!();
to_primitive!(i32, write_be_i32, Type::Int4);
to_primitive!(u32, write_be_u32, Type::Oid); fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
to_primitive!(i64, write_be_i64, Type::Int8); try!(w.write_i8(*self));
to_primitive!(f32, write_be_f32, Type::Float4); Ok(IsNull::No)
to_primitive!(f64, write_be_f64, Type::Float8); }
accepts!(Type::Char);
}
to_primitive!(i16, write_i16, Type::Int2);
to_primitive!(i32, write_i32, Type::Int4);
to_primitive!(u32, write_u32, Type::Oid);
to_primitive!(i64, write_i64, Type::Int8);
to_primitive!(f32, write_f32, Type::Float4);
to_primitive!(f64, write_f64, Type::Float8);
impl ToSql for IpAddr { impl ToSql for IpAddr {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
match *self { match *self {
IpAddr::Ipv4Addr(a, b, c, d) => { IpAddr::V4(addr) => {
let [a, b, c, d] = addr.octets();
try!(w.write_all(&[2, // family try!(w.write_all(&[2, // family
32, // bits 32, // bits
0, // is_cidr 0, // is_cidr
@ -753,20 +793,21 @@ impl ToSql for IpAddr {
a, b, c, d // addr a, b, c, d // addr
])); ]));
} }
IpAddr::Ipv6Addr(a, b, c, d, e, f, g, h) => { IpAddr::V6(addr) => {
let [a, b, c, d, e, f, g, h] = addr.segments();
try!(w.write_all(&[3, // family try!(w.write_all(&[3, // family
128, // bits 128, // bits
0, // is_cidr 0, // is_cidr
16, // nb 16, // nb
])); ]));
try!(w.write_be_u16(a)); try!(w.write_u16::<BigEndian>(a));
try!(w.write_be_u16(b)); try!(w.write_u16::<BigEndian>(b));
try!(w.write_be_u16(c)); try!(w.write_u16::<BigEndian>(c));
try!(w.write_be_u16(d)); try!(w.write_u16::<BigEndian>(d));
try!(w.write_be_u16(e)); try!(w.write_u16::<BigEndian>(e));
try!(w.write_be_u16(f)); try!(w.write_u16::<BigEndian>(f));
try!(w.write_be_u16(g)); try!(w.write_u16::<BigEndian>(g));
try!(w.write_be_u16(h)); try!(w.write_u16::<BigEndian>(h));
} }
} }
Ok(IsNull::No) Ok(IsNull::No)
@ -778,19 +819,19 @@ impl ToSql for IpAddr {
impl ToSql for HashMap<String, Option<String>> { impl ToSql for HashMap<String, Option<String>> {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
try!(w.write_be_i32(self.len() as i32)); try!(w.write_i32::<BigEndian>(self.len() as i32));
for (key, val) in self.iter() { for (key, val) in self.iter() {
try!(w.write_be_i32(key.len() as i32)); try!(w.write_i32::<BigEndian>(key.len() as i32));
try!(w.write_all(key.as_bytes())); try!(w.write_all(key.as_bytes()));
match *val { match *val {
Some(ref val) => { Some(ref val) => {
try!(w.write_be_i32(val.len() as i32)); try!(w.write_i32::<BigEndian>(val.len() as i32));
try!(w.write_all(val.as_bytes())); try!(w.write_all(val.as_bytes()));
} }
None => try!(w.write_be_i32(-1)) None => try!(w.write_i32::<BigEndian>(-1))
} }
} }

View File

@ -1,3 +1,11 @@
use std::result::Result::{Ok, Err};
use std::marker::Sized;
use std::slice::SliceExt;
use std::clone::Clone;
use std::io::prelude::*;
use byteorder::{WriteBytesExt, BigEndian};
use {Type, ToSql, Result, Error, Kind}; use {Type, ToSql, Result, Error, Kind};
use types::IsNull; use types::IsNull;
@ -26,7 +34,7 @@ pub struct Slice<'a, T: 'a + ToSql>(pub &'a [T]);
impl<'a, T: 'a + ToSql> ToSql for Slice<'a, T> { impl<'a, T: 'a + ToSql> ToSql for Slice<'a, T> {
to_sql_checked!(); to_sql_checked!();
fn to_sql<W: Writer+?Sized>(&self, ty: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, ty: &Type, mut w: &mut W) -> Result<IsNull> {
let member_type = match ty.kind() { let member_type = match ty.kind() {
&Kind::Array(ref member) => member, &Kind::Array(ref member) => member,
_ => panic!("expected array type"), _ => panic!("expected array type"),
@ -35,21 +43,21 @@ impl<'a, T: 'a + ToSql> ToSql for Slice<'a, T> {
return Err(Error::WrongType(ty.clone())); return Err(Error::WrongType(ty.clone()));
} }
try!(w.write_be_i32(1)); // number of dimensions try!(w.write_i32::<BigEndian>(1)); // number of dimensions
try!(w.write_be_i32(1)); // has nulls try!(w.write_i32::<BigEndian>(1)); // has nulls
try!(w.write_be_u32(member_type.to_oid())); try!(w.write_u32::<BigEndian>(member_type.to_oid()));
try!(w.write_be_i32(self.0.len() as i32)); try!(w.write_i32::<BigEndian>(self.0.len() as i32));
try!(w.write_be_i32(0)); // index offset try!(w.write_i32::<BigEndian>(0)); // index offset
for e in self.0 { for e in self.0 {
let mut inner_buf = vec![]; let mut inner_buf = vec![];
match try!(e.to_sql(&member_type, &mut inner_buf)) { match try!(e.to_sql(&member_type, &mut inner_buf)) {
IsNull::No => { IsNull::No => {
try!(w.write_be_i32(inner_buf.len() as i32)); try!(w.write_i32::<BigEndian>(inner_buf.len() as i32));
try!(w.write_all(&inner_buf)); try!(w.write_all(&inner_buf));
} }
IsNull::Yes => try!(w.write_be_i32(-1)), IsNull::Yes => try!(w.write_i32::<BigEndian>(-1)),
} }
} }

View File

@ -1,4 +1,11 @@
use std::result::Result::{Ok, Err};
use std::marker::Sized;
use std::clone::Clone;
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
use time::Timespec; use time::Timespec;
use std::io::prelude::*;
use Result; use Result;
use types::{Type, FromSql, ToSql, IsNull}; use types::{Type, FromSql, ToSql, IsNull};
@ -9,8 +16,8 @@ const NSEC_PER_USEC: i64 = 1_000;
const TIME_SEC_CONVERSION: i64 = 946684800; const TIME_SEC_CONVERSION: i64 = 946684800;
impl FromSql for Timespec { impl FromSql for Timespec {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<Timespec> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<Timespec> {
let t = try!(raw.read_be_i64()); let t = try!(raw.read_i64::<BigEndian>());
let mut sec = t / USEC_PER_SEC + TIME_SEC_CONVERSION; let mut sec = t / USEC_PER_SEC + TIME_SEC_CONVERSION;
let mut usec = t % USEC_PER_SEC; let mut usec = t % USEC_PER_SEC;
@ -26,9 +33,9 @@ impl FromSql for Timespec {
} }
impl ToSql for Timespec { impl ToSql for Timespec {
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, mut w: &mut W) -> Result<IsNull> {
let t = (self.sec - TIME_SEC_CONVERSION) * USEC_PER_SEC + self.nsec as i64 / NSEC_PER_USEC; let t = (self.sec - TIME_SEC_CONVERSION) * USEC_PER_SEC + self.nsec as i64 / NSEC_PER_USEC;
try!(w.write_be_i64(t)); try!(w.write_i64::<BigEndian>(t));
Ok(IsNull::No) Ok(IsNull::No)
} }

View File

@ -1,13 +1,23 @@
extern crate uuid; extern crate uuid;
use std::result::Result::{Ok, Err};
use std::marker::Sized;
use std::option::Option::{Some, None};
use std::clone::Clone;
use std::io::prelude::*;
use self::uuid::Uuid; use self::uuid::Uuid;
use types::{FromSql, ToSql, Type, IsNull}; use types::{FromSql, ToSql, Type, IsNull};
use Error; use Error;
use Result; use Result;
use util;
impl FromSql for Uuid { impl FromSql for Uuid {
fn from_sql<R: Reader>(_: &Type, raw: &mut R) -> Result<Uuid> { fn from_sql<R: Read>(_: &Type, raw: &mut R) -> Result<Uuid> {
match Uuid::from_bytes(&*try!(raw.read_to_end())) { let mut bytes = [0; 16];
try!(util::read_all(raw, &mut bytes));
match Uuid::from_bytes(&bytes) {
Some(u) => Ok(u), Some(u) => Ok(u),
None => Err(Error::BadResponse), None => Err(Error::BadResponse),
} }
@ -17,7 +27,7 @@ impl FromSql for Uuid {
} }
impl ToSql for Uuid { impl ToSql for Uuid {
fn to_sql<W: Writer+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> { fn to_sql<W: Write+?Sized>(&self, _: &Type, w: &mut W) -> Result<IsNull> {
try!(w.write_all(self.as_bytes())); try!(w.write_all(self.as_bytes()));
Ok(IsNull::No) Ok(IsNull::No)
} }

View File

@ -1,3 +1,10 @@
use std::option::Option::{self, Some, None};
use std::result::Result::{Ok, Err};
use std::vec::Vec;
use std::string::String;
use std::iter::IteratorExt;
use std::str::StrExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::error; use std::error;
use std::fmt; use std::fmt;

View File

@ -7,7 +7,15 @@
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use std::option::Option::{self, Some, None};
use std::result::Result::{self, Ok, Err};
use std::vec::Vec;
use std::string::{String, ToString};
use std::iter::{Iterator, IteratorExt, ExactSizeIterator};
use std::slice::SliceExt;
use std::str::{Str, StrExt};
use std::io::prelude::*;
use std::str::FromStr; use std::str::FromStr;
use std::num; use std::num;
use std::str; use std::str;

View File

@ -1,7 +1,16 @@
use std::old_io::IoResult; use std::result::Result::{Ok, Err};
use std::iter::Iterator;
use std::string::String;
use std::str::StrExt;
use std::iter::IteratorExt;
use std::option::Option::None;
use std::slice::SliceExt;
pub fn comma_join<'a, W, I>(writer: &mut W, strs: I) -> IoResult<()> use std::io;
where W: Writer, I: Iterator<Item=&'a str> { use std::io::prelude::*;
pub fn comma_join<'a, W, I>(writer: &mut W, strs: I) -> io::Result<()>
where W: Write, I: Iterator<Item=&'a str> {
let mut first = true; let mut first = true;
for str_ in strs { for str_ in strs {
if !first { if !first {
@ -16,3 +25,15 @@ pub fn comma_join<'a, W, I>(writer: &mut W, strs: I) -> IoResult<()>
pub fn parse_update_count(tag: String) -> u64 { pub fn parse_update_count(tag: String) -> u64 {
tag.split(' ').last().unwrap().parse().unwrap_or(0) tag.split(' ').last().unwrap().parse().unwrap_or(0)
} }
pub fn read_all<R: Read>(r: &mut R, mut buf: &mut [u8]) -> io::Result<()> {
let mut start = 0;
while start != buf.len() {
match r.read(&mut buf[start..]) {
Ok(0) => return Err(io::Error::new(io::ErrorKind::Other, "unexpected EOF", None)),
Ok(len) => start += len,
Err(e) => return Err(e),
}
}
Ok(())
}

View File

@ -1,4 +1,4 @@
#![feature(core, std_misc, old_io)] #![feature(core, std_misc, old_io, net)]
extern crate postgres; extern crate postgres;
extern crate "rustc-serialize" as serialize; extern crate "rustc-serialize" as serialize;
@ -79,6 +79,7 @@ fn test_connection_finish() {
} }
#[test] #[test]
#[cfg_attr(not(feature = "unix_socket"), ignore)]
fn test_unix_connection() { fn test_unix_connection() {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None)); let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
let stmt = or_panic!(conn.prepare("SHOW unix_socket_directories")); let stmt = or_panic!(conn.prepare("SHOW unix_socket_directories"));
@ -609,6 +610,7 @@ fn test_notifications_next_block() {
}, or_panic!(notifications.next_block())); }, or_panic!(notifications.next_block()));
} }
/*
#[test] #[test]
fn test_notifications_next_block_for() { fn test_notifications_next_block_for() {
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None)); let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
@ -648,6 +650,7 @@ fn test_notifications_next_block_for_timeout() {
or_panic!(conn.execute("SELECT 1", &[])); or_panic!(conn.execute("SELECT 1", &[]));
} }
*/
#[test] #[test]
// This test is pretty sad, but I don't think there's a better way :( // This test is pretty sad, but I don't think there's a better way :(

View File

@ -3,7 +3,7 @@ use std::f32;
use std::f64; use std::f64;
use std::fmt; use std::fmt;
use std::num::Float; use std::num::Float;
use std::old_io::net::ip::IpAddr; use std::net::IpAddr;
use postgres::{Connection, SslMode, Slice, Error}; use postgres::{Connection, SslMode, Slice, Error};
use postgres::types::{ToSql, FromSql}; use postgres::types::{ToSql, FromSql};