Move postgres-protocol in-tree
This commit is contained in:
parent
33d45db60f
commit
6df3842274
@ -1,2 +1,8 @@
|
||||
[workspace]
|
||||
members = ["codegen", "postgres", "postgres-shared", "tokio-postgres"]
|
||||
members = [
|
||||
"codegen",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-shared",
|
||||
"tokio-postgres"
|
||||
]
|
||||
|
4
postgres-protocol/.gitignore
vendored
Normal file
4
postgres-protocol/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
target
|
||||
Cargo.lock
|
||||
.idea/
|
||||
*.iml
|
7
postgres-protocol/.travis.yml
Normal file
7
postgres-protocol/.travis.yml
Normal file
@ -0,0 +1,7 @@
|
||||
language: rust
|
||||
cache: cargo
|
||||
rust:
|
||||
- nightly
|
||||
- 1.10.0
|
||||
script:
|
||||
- cargo test
|
15
postgres-protocol/Cargo.toml
Normal file
15
postgres-protocol/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "postgres-protocol"
|
||||
version = "0.2.1"
|
||||
authors = ["Steven Fackler <sfackler@gmail.com>"]
|
||||
description = "Low level Postgres protocol APIs"
|
||||
license = "MIT/Apache-2.0"
|
||||
repository = "https://github.com/sfackler/rust-postgres-protocol"
|
||||
documentation = "https://docs.rs/postgres-protocol/0.2.1/postgres_protocol"
|
||||
readme = "README.md"
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.0"
|
||||
fallible-iterator = "0.1"
|
||||
md5 = "0.3"
|
||||
memchr = "1.0"
|
201
postgres-protocol/LICENSE-APACHE
Normal file
201
postgres-protocol/LICENSE-APACHE
Normal file
@ -0,0 +1,201 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
22
postgres-protocol/LICENSE-MIT
Normal file
22
postgres-protocol/LICENSE-MIT
Normal file
@ -0,0 +1,22 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2016 Steven Fackler
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
34
postgres-protocol/src/authentication.rs
Normal file
34
postgres-protocol/src/authentication.rs
Normal file
@ -0,0 +1,34 @@
|
||||
//! Authentication protocol support.
|
||||
use md5::Context;
|
||||
|
||||
/// Hashes authentication information in a way suitable for use in response
|
||||
/// to an `AuthenticationMd5Password` message.
|
||||
///
|
||||
/// The resulting string should be sent back to the database in a
|
||||
/// `PasswordMessage` message.
|
||||
#[inline]
|
||||
pub fn md5_hash(username: &[u8], password: &[u8], salt: [u8; 4]) -> String {
|
||||
let mut context = Context::new();
|
||||
context.consume(password);
|
||||
context.consume(username);
|
||||
let output = context.compute();
|
||||
context = Context::new();
|
||||
context.consume(format!("{:x}", output));
|
||||
context.consume(&salt);
|
||||
format!("md5{:x}", context.compute())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn md5() {
|
||||
let username = b"md5_user";
|
||||
let password = b"password";
|
||||
let salt = [0x2a, 0x3d, 0x8f, 0xe0];
|
||||
|
||||
assert_eq!(md5_hash(username, password, salt),
|
||||
"md562af4dd09bbb41884907a838a3233294");
|
||||
}
|
||||
}
|
73
postgres-protocol/src/lib.rs
Normal file
73
postgres-protocol/src/lib.rs
Normal file
@ -0,0 +1,73 @@
|
||||
//! Low level Postgres protocol APIs.
|
||||
//!
|
||||
//! This crate implements the low level components of Postgres's communication
|
||||
//! protocol, including message and value serialization and deserialization.
|
||||
//! It is designed to be used as a building block by higher level APIs such as
|
||||
//! `rust-postgres`, and should not typically be used directly.
|
||||
//!
|
||||
//! # Note
|
||||
//!
|
||||
//! This library assumes that the `client_encoding` backend parameter has been
|
||||
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
|
||||
#![doc(html_root_url="https://docs.rs/postgres-protocol/0.2.1")]
|
||||
#![warn(missing_docs)]
|
||||
extern crate byteorder;
|
||||
extern crate fallible_iterator;
|
||||
extern crate md5;
|
||||
extern crate memchr;
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use std::io;
|
||||
|
||||
pub mod authentication;
|
||||
pub mod message;
|
||||
pub mod types;
|
||||
|
||||
/// A Postgres OID.
|
||||
pub type Oid = u32;
|
||||
|
||||
/// An enum indicating if a value is `NULL` or not.
|
||||
pub enum IsNull {
|
||||
/// The value is `NULL`.
|
||||
Yes,
|
||||
/// The value is not `NULL`.
|
||||
No,
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_nullable<F, E>(serializer: F, buf: &mut Vec<u8>) -> Result<(), E>
|
||||
where F: FnOnce(&mut Vec<u8>) -> Result<IsNull, E>,
|
||||
E: From<io::Error>
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
let size = match try!(serializer(buf)) {
|
||||
IsNull::No => try!(i32::from_usize(buf.len() - base - 4)),
|
||||
IsNull::Yes => -1,
|
||||
};
|
||||
BigEndian::write_i32(&mut buf[base..], size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
trait FromUsize: Sized {
|
||||
fn from_usize(x: usize) -> Result<Self, io::Error>;
|
||||
}
|
||||
|
||||
macro_rules! from_usize {
|
||||
($t:ty) => {
|
||||
impl FromUsize for $t {
|
||||
#[inline]
|
||||
fn from_usize(x: usize) -> io::Result<$t> {
|
||||
if x > <$t>::max_value() as usize {
|
||||
Err(io::Error::new(io::ErrorKind::InvalidInput, "value too large to transmit"))
|
||||
} else {
|
||||
Ok(x as $t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
from_usize!(i16);
|
||||
from_usize!(i32);
|
757
postgres-protocol/src/message/backend.rs
Normal file
757
postgres-protocol/src/message/backend.rs
Normal file
@ -0,0 +1,757 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use byteorder::{ReadBytesExt, BigEndian};
|
||||
use memchr::memchr;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use std::io::{self, Read};
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::str;
|
||||
|
||||
use Oid;
|
||||
|
||||
/// An enum representing Postgres backend messages.
|
||||
pub enum Message<T> {
|
||||
AuthenticationCleartextPassword,
|
||||
AuthenticationGss,
|
||||
AuthenticationKerberosV5,
|
||||
AuthenticationMd5Password(AuthenticationMd5PasswordBody<T>),
|
||||
AuthenticationOk,
|
||||
AuthenticationScmCredential,
|
||||
AuthenticationSspi,
|
||||
BackendKeyData(BackendKeyDataBody<T>),
|
||||
BindComplete,
|
||||
CloseComplete,
|
||||
CommandComplete(CommandCompleteBody<T>),
|
||||
CopyData(CopyDataBody<T>),
|
||||
CopyDone,
|
||||
CopyInResponse(CopyInResponseBody<T>),
|
||||
CopyOutResponse(CopyOutResponseBody<T>),
|
||||
DataRow(DataRowBody<T>),
|
||||
EmptyQueryResponse,
|
||||
ErrorResponse(ErrorResponseBody<T>),
|
||||
NoData,
|
||||
NoticeResponse(NoticeResponseBody<T>),
|
||||
NotificationResponse(NotificationResponseBody<T>),
|
||||
ParameterDescription(ParameterDescriptionBody<T>),
|
||||
ParameterStatus(ParameterStatusBody<T>),
|
||||
ParseComplete,
|
||||
PortalSuspended,
|
||||
ReadyForQuery(ReadyForQueryBody<T>),
|
||||
RowDescription(RowDescriptionBody<T>),
|
||||
#[doc(hidden)]
|
||||
__ForExtensibility,
|
||||
}
|
||||
|
||||
impl<'a> Message<&'a [u8]> {
|
||||
/// Attempts to parse a backend message from the buffer.
|
||||
///
|
||||
/// This method is unfortunately difficult to use due to deficiencies in the compiler's borrow
|
||||
/// checker.
|
||||
#[inline]
|
||||
pub fn parse(buf: &'a [u8]) -> io::Result<ParseResult<&'a [u8]>> {
|
||||
Message::parse_inner(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Message<Vec<u8>> {
|
||||
/// Attempts to parse a backend message from the buffer.
|
||||
///
|
||||
/// In contrast to `parse`, this method produces messages that do not reference the input,
|
||||
/// buffer by copying any necessary portions internally.
|
||||
#[inline]
|
||||
pub fn parse_owned(buf: &[u8]) -> io::Result<ParseResult<Vec<u8>>> {
|
||||
Message::parse_inner(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Message<T>
|
||||
where T: From<&'a [u8]>
|
||||
{
|
||||
#[inline]
|
||||
fn parse_inner(buf: &'a [u8]) -> io::Result<ParseResult<T>> {
|
||||
if buf.len() < 5 {
|
||||
return Ok(ParseResult::Incomplete { required_size: None });
|
||||
}
|
||||
|
||||
let mut r = buf;
|
||||
let tag = r.read_u8().unwrap();
|
||||
// add a byte for the tag
|
||||
let len = r.read_u32::<BigEndian>().unwrap() as usize + 1;
|
||||
|
||||
if buf.len() < len {
|
||||
return Ok(ParseResult::Incomplete { required_size: Some(len) });
|
||||
}
|
||||
|
||||
let mut buf = &buf[5..len];
|
||||
let message = match tag {
|
||||
b'1' => Message::ParseComplete,
|
||||
b'2' => Message::BindComplete,
|
||||
b'3' => Message::CloseComplete,
|
||||
b'A' => {
|
||||
let process_id = try!(buf.read_i32::<BigEndian>());
|
||||
let channel_end = try!(find_null(buf, 0));
|
||||
let message_end = try!(find_null(buf, channel_end + 1));
|
||||
let storage = buf[..message_end].into();
|
||||
buf = &buf[message_end + 1..];
|
||||
Message::NotificationResponse(NotificationResponseBody {
|
||||
storage: storage,
|
||||
process_id: process_id,
|
||||
channel_end: channel_end,
|
||||
})
|
||||
}
|
||||
b'c' => Message::CopyDone,
|
||||
b'C' => {
|
||||
let tag_end = try!(find_null(buf, 0));
|
||||
let storage = buf[..tag_end].into();
|
||||
buf = &buf[tag_end + 1..];
|
||||
Message::CommandComplete(CommandCompleteBody {
|
||||
storage: storage,
|
||||
})
|
||||
}
|
||||
b'd' => {
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::CopyData(CopyDataBody { storage: storage })
|
||||
}
|
||||
b'D' => {
|
||||
let len = try!(buf.read_u16::<BigEndian>());
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::DataRow(DataRowBody {
|
||||
storage: storage,
|
||||
len: len,
|
||||
})
|
||||
}
|
||||
b'E' => {
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::ErrorResponse(ErrorResponseBody { storage: storage })
|
||||
}
|
||||
b'G' => {
|
||||
let format = try!(buf.read_u8());
|
||||
let len = try!(buf.read_u16::<BigEndian>());
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::CopyInResponse(CopyInResponseBody {
|
||||
format: format,
|
||||
len: len,
|
||||
storage: storage,
|
||||
})
|
||||
}
|
||||
b'H' => {
|
||||
let format = try!(buf.read_u8());
|
||||
let len = try!(buf.read_u16::<BigEndian>());
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::CopyOutResponse(CopyOutResponseBody {
|
||||
format: format,
|
||||
len: len,
|
||||
storage: storage,
|
||||
})
|
||||
}
|
||||
b'I' => Message::EmptyQueryResponse,
|
||||
b'K' => {
|
||||
let process_id = try!(buf.read_i32::<BigEndian>());
|
||||
let secret_key = try!(buf.read_i32::<BigEndian>());
|
||||
Message::BackendKeyData(BackendKeyDataBody {
|
||||
process_id: process_id,
|
||||
secret_key: secret_key,
|
||||
_p: PhantomData,
|
||||
})
|
||||
}
|
||||
b'n' => Message::NoData,
|
||||
b'N' => {
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::NoticeResponse(NoticeResponseBody {
|
||||
storage: storage,
|
||||
})
|
||||
}
|
||||
b'R' => {
|
||||
match try!(buf.read_i32::<BigEndian>()) {
|
||||
0 => Message::AuthenticationOk,
|
||||
2 => Message::AuthenticationKerberosV5,
|
||||
3 => Message::AuthenticationCleartextPassword,
|
||||
5 => {
|
||||
let mut salt = [0; 4];
|
||||
try!(buf.read_exact(&mut salt));
|
||||
Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody {
|
||||
salt: salt,
|
||||
_p: PhantomData,
|
||||
})
|
||||
}
|
||||
6 => Message::AuthenticationScmCredential,
|
||||
7 => Message::AuthenticationGss,
|
||||
9 => Message::AuthenticationSspi,
|
||||
tag => {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput,
|
||||
format!("unknown authentication tag `{}`", tag)));
|
||||
}
|
||||
}
|
||||
}
|
||||
b's' => Message::PortalSuspended,
|
||||
b'S' => {
|
||||
let name_end = try!(find_null(buf, 0));
|
||||
let value_end = try!(find_null(buf, name_end + 1));
|
||||
let storage = buf[0..value_end].into();
|
||||
buf = &buf[value_end + 1..];
|
||||
Message::ParameterStatus(ParameterStatusBody {
|
||||
storage: storage,
|
||||
name_end: name_end,
|
||||
})
|
||||
}
|
||||
b't' => {
|
||||
let len = try!(buf.read_u16::<BigEndian>());
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::ParameterDescription(ParameterDescriptionBody {
|
||||
storage: storage,
|
||||
len: len,
|
||||
})
|
||||
}
|
||||
b'T' => {
|
||||
let len = try!(buf.read_u16::<BigEndian>());
|
||||
let storage = buf.into();
|
||||
buf = &[];
|
||||
Message::RowDescription(RowDescriptionBody {
|
||||
storage: storage,
|
||||
len: len,
|
||||
})
|
||||
}
|
||||
b'Z' => {
|
||||
let status = try!(buf.read_u8());
|
||||
Message::ReadyForQuery(ReadyForQueryBody {
|
||||
status: status,
|
||||
_p: PhantomData,
|
||||
})
|
||||
}
|
||||
tag => {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput,
|
||||
format!("unknown message tag `{}`", tag)));
|
||||
}
|
||||
};
|
||||
|
||||
if !buf.is_empty() {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
|
||||
Ok(ParseResult::Complete {
|
||||
message: message,
|
||||
consumed: len,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of an attempted parse.
|
||||
pub enum ParseResult<T> {
|
||||
/// The message was successfully parsed.
|
||||
Complete {
|
||||
/// The message.
|
||||
message: Message<T>,
|
||||
/// The number of bytes of the input buffer consumed to parse this message.
|
||||
consumed: usize,
|
||||
},
|
||||
/// The buffer did not contain a full message.
|
||||
Incomplete {
|
||||
/// The number of total bytes required to parse a message, if known.
|
||||
///
|
||||
/// This value is present if the input buffer contains at least 5 bytes.
|
||||
required_size: Option<usize>,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AuthenticationMd5PasswordBody<T> {
|
||||
salt: [u8; 4],
|
||||
_p: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> AuthenticationMd5PasswordBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn salt(&self) -> [u8; 4] {
|
||||
self.salt
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BackendKeyDataBody<T> {
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
_p: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> BackendKeyDataBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn process_id(&self) -> i32 {
|
||||
self.process_id
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn secret_key(&self) -> i32 {
|
||||
self.secret_key
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CommandCompleteBody<T> {
|
||||
storage: T,
|
||||
}
|
||||
|
||||
impl<T> CommandCompleteBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn tag(&self) -> io::Result<&str> {
|
||||
get_str(&self.storage)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CopyDataBody<T> {
|
||||
storage: T,
|
||||
}
|
||||
|
||||
impl<T> CopyDataBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.storage
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CopyInResponseBody<T> {
|
||||
storage: T,
|
||||
len: u16,
|
||||
format: u8,
|
||||
}
|
||||
|
||||
impl<T> CopyInResponseBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn format(&self) -> u8 {
|
||||
self.format
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn column_formats<'a>(&'a self) -> ColumnFormats<'a> {
|
||||
ColumnFormats {
|
||||
remaining: self.len,
|
||||
buf: &self.storage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ColumnFormats<'a> {
|
||||
buf: &'a [u8],
|
||||
remaining: u16,
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for ColumnFormats<'a> {
|
||||
type Item = u16;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<u16>> {
|
||||
if self.remaining == 0 {
|
||||
if self.buf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
self.buf.read_u16::<BigEndian>().map(Some)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.remaining as usize;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CopyOutResponseBody<T> {
|
||||
storage: T,
|
||||
len: u16,
|
||||
format: u8,
|
||||
}
|
||||
|
||||
impl<T> CopyOutResponseBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn format(&self) -> u8 {
|
||||
self.format
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn column_formats<'a>(&'a self) -> ColumnFormats<'a> {
|
||||
ColumnFormats {
|
||||
remaining: self.len,
|
||||
buf: &self.storage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataRowBody<T> {
|
||||
storage: T,
|
||||
len: u16,
|
||||
}
|
||||
|
||||
impl<T> DataRowBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn values<'a>(&'a self) -> DataRowValues<'a> {
|
||||
DataRowValues {
|
||||
buf: &self.storage,
|
||||
remaining: self.len,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataRowValues<'a> {
|
||||
buf: &'a [u8],
|
||||
remaining: u16,
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for DataRowValues<'a> {
|
||||
type Item = Option<&'a [u8]>;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<Option<&'a [u8]>>> {
|
||||
if self.remaining == 0 {
|
||||
if self.buf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
let len = try!(self.buf.read_i32::<BigEndian>());
|
||||
if len < 0 {
|
||||
Ok(Some(None))
|
||||
} else {
|
||||
let len = len as usize;
|
||||
if self.buf.len() < len {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF"));
|
||||
}
|
||||
let (head, tail) = self.buf.split_at(len);
|
||||
self.buf = tail;
|
||||
Ok(Some(Some(head)))
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.remaining as usize;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ErrorResponseBody<T> {
|
||||
storage: T,
|
||||
}
|
||||
|
||||
impl<T> ErrorResponseBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn fields<'a>(&'a self) -> ErrorFields<'a> {
|
||||
ErrorFields {
|
||||
buf: &self.storage
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ErrorFields<'a> {
|
||||
buf: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for ErrorFields<'a> {
|
||||
type Item = ErrorField<'a>;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<ErrorField<'a>>> {
|
||||
let type_ = try!(self.buf.read_u8());
|
||||
if type_ == 0 {
|
||||
if self.buf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
}
|
||||
|
||||
let value_end = try!(find_null(self.buf, 0));
|
||||
let value = try!(get_str(&self.buf[..value_end]));
|
||||
self.buf = &self.buf[value_end + 1..];
|
||||
|
||||
Ok(Some(ErrorField {
|
||||
type_: type_,
|
||||
value: value,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ErrorField<'a> {
|
||||
type_: u8,
|
||||
value: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> ErrorField<'a> {
|
||||
#[inline]
|
||||
pub fn type_(&self) -> u8 {
|
||||
self.type_
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value(&self) -> &str {
|
||||
self.value
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NoticeResponseBody<T> {
|
||||
storage: T,
|
||||
}
|
||||
|
||||
impl<T> NoticeResponseBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn fields<'a>(&'a self) -> ErrorFields<'a> {
|
||||
ErrorFields {
|
||||
buf: &self.storage
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NotificationResponseBody<T> {
|
||||
storage: T,
|
||||
process_id: i32,
|
||||
channel_end: usize,
|
||||
}
|
||||
|
||||
impl<T> NotificationResponseBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn process_id(&self) -> i32 {
|
||||
self.process_id
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn channel(&self) -> io::Result<&str> {
|
||||
get_str(&self.storage[..self.channel_end])
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn message(&self) -> io::Result<&str> {
|
||||
get_str(&self.storage[self.channel_end + 1..])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ParameterDescriptionBody<T> {
|
||||
storage: T,
|
||||
len: u16,
|
||||
}
|
||||
|
||||
impl<T> ParameterDescriptionBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn parameters<'a>(&'a self) -> Parameters<'a> {
|
||||
Parameters {
|
||||
buf: &self.storage,
|
||||
remaining: self.len,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Parameters<'a> {
|
||||
buf: &'a [u8],
|
||||
remaining: u16,
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for Parameters<'a> {
|
||||
type Item = Oid;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<Oid>> {
|
||||
if self.remaining == 0 {
|
||||
if self.buf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
self.buf.read_u32::<BigEndian>().map(Some)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.remaining as usize;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ParameterStatusBody<T> {
|
||||
storage: T,
|
||||
name_end: usize,
|
||||
}
|
||||
|
||||
impl<T> ParameterStatusBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn name(&self) -> io::Result<&str> {
|
||||
get_str(&self.storage[..self.name_end])
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value(&self) -> io::Result<&str> {
|
||||
get_str(&self.storage[self.name_end + 1..])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadyForQueryBody<T> {
|
||||
status: u8,
|
||||
_p: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> ReadyForQueryBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn status(&self) -> u8 {
|
||||
self.status
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RowDescriptionBody<T> {
|
||||
storage: T,
|
||||
len: u16,
|
||||
}
|
||||
|
||||
impl<T> RowDescriptionBody<T>
|
||||
where T: Deref<Target = [u8]>
|
||||
{
|
||||
#[inline]
|
||||
pub fn fields<'a>(&'a self) -> Fields<'a> {
|
||||
Fields {
|
||||
buf: &self.storage,
|
||||
remaining: self.len,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Fields<'a> {
|
||||
buf: &'a [u8],
|
||||
remaining: u16,
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for Fields<'a> {
|
||||
type Item = Field<'a>;
|
||||
type Error = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> io::Result<Option<Field<'a>>> {
|
||||
if self.remaining == 0 {
|
||||
if self.buf.is_empty() {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid message length"));
|
||||
}
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
let name_end = try!(find_null(self.buf, 0));
|
||||
let name = try!(get_str(&self.buf[..name_end]));
|
||||
self.buf = &self.buf[name_end + 1..];
|
||||
let table_oid = try!(self.buf.read_u32::<BigEndian>());
|
||||
let column_id = try!(self.buf.read_i16::<BigEndian>());
|
||||
let type_oid = try!(self.buf.read_u32::<BigEndian>());
|
||||
let type_size = try!(self.buf.read_i16::<BigEndian>());
|
||||
let type_modifier = try!(self.buf.read_i32::<BigEndian>());
|
||||
let format = try!(self.buf.read_i16::<BigEndian>());
|
||||
|
||||
Ok(Some(Field {
|
||||
name: name,
|
||||
table_oid: table_oid,
|
||||
column_id: column_id,
|
||||
type_oid: type_oid,
|
||||
type_size: type_size,
|
||||
type_modifier: type_modifier,
|
||||
format: format,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Field<'a> {
|
||||
name: &'a str,
|
||||
table_oid: Oid,
|
||||
column_id: i16,
|
||||
type_oid: Oid,
|
||||
type_size: i16,
|
||||
type_modifier: i32,
|
||||
format: i16,
|
||||
}
|
||||
|
||||
impl<'a> Field<'a> {
|
||||
#[inline]
|
||||
pub fn name(&self) -> &'a str {
|
||||
self.name
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn table_oid(&self) -> Oid {
|
||||
self.table_oid
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn column_id(&self) -> i16 {
|
||||
self.column_id
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn type_oid(&self) -> Oid {
|
||||
self.type_oid
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn type_size(&self) -> i16 {
|
||||
self.type_size
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn type_modifier(&self) -> i32 {
|
||||
self.type_modifier
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn format(&self) -> i16 {
|
||||
self.format
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn find_null(buf: &[u8], start: usize) -> io::Result<usize> {
|
||||
match memchr(0, &buf[start..]) {
|
||||
Some(pos) => Ok(pos + start),
|
||||
None => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF"))
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_str(buf: &[u8]) -> io::Result<&str> {
|
||||
str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
|
||||
}
|
327
postgres-protocol/src/message/frontend.rs
Normal file
327
postgres-protocol/src/message/frontend.rs
Normal file
@ -0,0 +1,327 @@
|
||||
//! Frontend message serialization.
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use byteorder::{WriteBytesExt, BigEndian, ByteOrder};
|
||||
use std::error::Error;
|
||||
use std::io;
|
||||
use std::marker;
|
||||
|
||||
use {Oid, FromUsize, IsNull, write_nullable};
|
||||
|
||||
pub enum Message<'a> {
|
||||
Bind {
|
||||
portal: &'a str,
|
||||
statement: &'a str,
|
||||
formats: &'a [i16],
|
||||
values: &'a [Option<Vec<u8>>],
|
||||
result_formats: &'a [i16],
|
||||
},
|
||||
CancelRequest {
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
},
|
||||
Close {
|
||||
variant: u8,
|
||||
name: &'a str,
|
||||
},
|
||||
CopyData {
|
||||
data: &'a [u8],
|
||||
},
|
||||
CopyDone,
|
||||
CopyFail {
|
||||
message: &'a str,
|
||||
},
|
||||
Describe {
|
||||
variant: u8,
|
||||
name: &'a str,
|
||||
},
|
||||
Execute {
|
||||
portal: &'a str,
|
||||
max_rows: i32,
|
||||
},
|
||||
Parse {
|
||||
name: &'a str,
|
||||
query: &'a str,
|
||||
param_types: &'a [Oid],
|
||||
},
|
||||
PasswordMessage {
|
||||
password: &'a str,
|
||||
},
|
||||
Query {
|
||||
query: &'a str,
|
||||
},
|
||||
SslRequest,
|
||||
StartupMessage {
|
||||
parameters: &'a [(String, String)],
|
||||
},
|
||||
Sync,
|
||||
Terminate,
|
||||
#[doc(hidden)]
|
||||
__ForExtensibility,
|
||||
}
|
||||
|
||||
impl<'a> Message<'a> {
|
||||
#[inline]
|
||||
pub fn serialize(&self, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
match *self {
|
||||
Message::Bind { portal, statement, formats, values, result_formats } => {
|
||||
let r = bind(portal,
|
||||
statement,
|
||||
formats.iter().cloned(),
|
||||
values,
|
||||
|v, buf| {
|
||||
match *v {
|
||||
Some(ref v) => {
|
||||
buf.extend_from_slice(v);
|
||||
Ok(IsNull::No)
|
||||
}
|
||||
None => Ok(IsNull::Yes),
|
||||
}
|
||||
},
|
||||
result_formats.iter().cloned(),
|
||||
buf);
|
||||
match r {
|
||||
Ok(()) => Ok(()),
|
||||
Err(BindError::Conversion(_)) => unreachable!(),
|
||||
Err(BindError::Serialization(e)) => Err(e),
|
||||
}
|
||||
}
|
||||
Message::CancelRequest { process_id, secret_key } => {
|
||||
Ok(cancel_request(process_id, secret_key, buf))
|
||||
}
|
||||
Message::Close { variant, name } => close(variant, name, buf),
|
||||
Message::CopyData { data } => copy_data(data, buf),
|
||||
Message::CopyDone => Ok(copy_done(buf)),
|
||||
Message::CopyFail { message } => copy_fail(message, buf),
|
||||
Message::Describe { variant, name } => describe(variant, name, buf),
|
||||
Message::Execute { portal, max_rows } => execute(portal, max_rows, buf),
|
||||
Message::Parse { name, query, param_types } => {
|
||||
parse(name, query, param_types.iter().cloned(), buf)
|
||||
}
|
||||
Message::PasswordMessage { password } => password_message(password, buf),
|
||||
Message::Query { query: q } => query(q, buf),
|
||||
Message::SslRequest => Ok(ssl_request(buf)),
|
||||
Message::StartupMessage { parameters } => {
|
||||
startup_message(parameters.iter().map(|&(ref k, ref v)| (&**k, &**v)), buf)
|
||||
}
|
||||
Message::Sync => Ok(sync(buf)),
|
||||
Message::Terminate => Ok(terminate(buf)),
|
||||
Message::__ForExtensibility => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_body<F, E>(buf: &mut Vec<u8>, f: F) -> Result<(), E>
|
||||
where F: FnOnce(&mut Vec<u8>) -> Result<(), E>,
|
||||
E: From<io::Error>
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
|
||||
try!(f(buf));
|
||||
|
||||
let size = try!(i32::from_usize(buf.len() - base));
|
||||
BigEndian::write_i32(&mut buf[base..], size);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub enum BindError {
|
||||
Conversion(Box<Error + marker::Sync + Send>),
|
||||
Serialization(io::Error),
|
||||
}
|
||||
|
||||
impl From<Box<Error + marker::Sync + Send>> for BindError {
|
||||
#[inline]
|
||||
fn from(e: Box<Error + marker::Sync + Send>) -> BindError {
|
||||
BindError::Conversion(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for BindError {
|
||||
#[inline]
|
||||
fn from(e: io::Error) -> BindError {
|
||||
BindError::Serialization(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn bind<I, J, F, T, K>(portal: &str,
|
||||
statement: &str,
|
||||
formats: I,
|
||||
values: J,
|
||||
mut serializer: F,
|
||||
result_formats: K,
|
||||
buf: &mut Vec<u8>)
|
||||
-> Result<(), BindError>
|
||||
where I: IntoIterator<Item = i16>,
|
||||
J: IntoIterator<Item = T>,
|
||||
F: FnMut(T, &mut Vec<u8>) -> Result<IsNull, Box<Error + marker::Sync + Send>>,
|
||||
K: IntoIterator<Item = i16>,
|
||||
{
|
||||
buf.push(b'B');
|
||||
|
||||
write_body(buf, |buf| {
|
||||
try!(buf.write_cstr(portal));
|
||||
try!(buf.write_cstr(statement));
|
||||
try!(write_counted(formats, |f, buf| buf.write_i16::<BigEndian>(f), buf));
|
||||
try!(write_counted(values,
|
||||
|v, buf| write_nullable(|buf| serializer(v, buf), buf),
|
||||
buf));
|
||||
try!(write_counted(result_formats, |f, buf| buf.write_i16::<BigEndian>(f), buf));
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_counted<I, T, F, E>(items: I, mut serializer: F, buf: &mut Vec<u8>) -> Result<(), E>
|
||||
where I: IntoIterator<Item = T>,
|
||||
F: FnMut(T, &mut Vec<u8>) -> Result<(), E>,
|
||||
E: From<io::Error>
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 2]);
|
||||
let mut count = 0;
|
||||
for item in items {
|
||||
try!(serializer(item, buf));
|
||||
count += 1;
|
||||
}
|
||||
let count = try!(i16::from_usize(count));
|
||||
BigEndian::write_i16(&mut buf[base..], count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut Vec<u8>) {
|
||||
write_body(buf, |buf| {
|
||||
buf.write_i32::<BigEndian>(80877102).unwrap();
|
||||
buf.write_i32::<BigEndian>(process_id).unwrap();
|
||||
buf.write_i32::<BigEndian>(secret_key)
|
||||
}).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn close(variant: u8, name: &str, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'C');
|
||||
write_body(buf, |buf| {
|
||||
buf.push(variant);
|
||||
buf.write_cstr(name)
|
||||
})
|
||||
}
|
||||
|
||||
// FIXME ideally this'd take a Read but it's unclear what to do at EOF
|
||||
#[inline]
|
||||
pub fn copy_data(data: &[u8], buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.extend_from_slice(data);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn copy_done(buf: &mut Vec<u8>) {
|
||||
buf.push(b'c');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn copy_fail(message: &str, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'f');
|
||||
write_body(buf, |buf| buf.write_cstr(message))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn describe(variant: u8, name: &str, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'D');
|
||||
write_body(buf, |buf| {
|
||||
buf.push(variant);
|
||||
buf.write_cstr(name)
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn execute(portal: &str, max_rows: i32, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'E');
|
||||
write_body(buf, |buf| {
|
||||
try!(buf.write_cstr(portal));
|
||||
buf.write_i32::<BigEndian>(max_rows).unwrap();
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn parse<I>(name: &str, query: &str, param_types: I, buf: &mut Vec<u8>) -> io::Result<()>
|
||||
where I: IntoIterator<Item = Oid>
|
||||
{
|
||||
buf.push(b'P');
|
||||
write_body(buf, |buf| {
|
||||
try!(buf.write_cstr(name));
|
||||
try!(buf.write_cstr(query));
|
||||
try!(write_counted(param_types, |t, buf| buf.write_u32::<BigEndian>(t), buf));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn password_message(password: &str, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'p');
|
||||
write_body(buf, |buf| buf.write_cstr(password))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn query(query: &str, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.push(b'Q');
|
||||
write_body(buf, |buf| buf.write_cstr(query))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn ssl_request(buf: &mut Vec<u8>) {
|
||||
write_body(buf, |buf| buf.write_i32::<BigEndian>(80877103)).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn startup_message<'a, I>(parameters: I, buf: &mut Vec<u8>) -> io::Result<()>
|
||||
where I: IntoIterator<Item = (&'a str, &'a str)>
|
||||
{
|
||||
write_body(buf, |buf| {
|
||||
buf.write_i32::<BigEndian>(196608).unwrap();
|
||||
for (key, value) in parameters {
|
||||
try!(buf.write_cstr(key.as_ref()));
|
||||
try!(buf.write_cstr(value.as_ref()));
|
||||
}
|
||||
buf.push(0);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn sync(buf: &mut Vec<u8>) {
|
||||
buf.push(b'S');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn terminate(buf: &mut Vec<u8>) {
|
||||
buf.push(b'X');
|
||||
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
|
||||
}
|
||||
|
||||
trait WriteCStr {
|
||||
fn write_cstr(&mut self, s: &str) -> Result<(), io::Error>;
|
||||
}
|
||||
|
||||
impl WriteCStr for Vec<u8> {
|
||||
#[inline]
|
||||
fn write_cstr(&mut self, s: &str) -> Result<(), io::Error> {
|
||||
if s.as_bytes().contains(&0) {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput,
|
||||
"string contains embedded null"));
|
||||
}
|
||||
self.extend_from_slice(s.as_bytes());
|
||||
self.push(0);
|
||||
Ok(())
|
||||
}
|
||||
}
|
8
postgres-protocol/src/message/mod.rs
Normal file
8
postgres-protocol/src/message/mod.rs
Normal file
@ -0,0 +1,8 @@
|
||||
//! Postgres message protocol support.
|
||||
//!
|
||||
//! See [Postgres's documentation][docs] for more information on message flow.
|
||||
//!
|
||||
//! [docs]: https://www.postgresql.org/docs/9.5/static/protocol-flow.html
|
||||
|
||||
pub mod backend;
|
||||
pub mod frontend;
|
850
postgres-protocol/src/types.rs
Normal file
850
postgres-protocol/src/types.rs
Normal file
@ -0,0 +1,850 @@
|
||||
//! Conversions to and from Postgres's binary format for various types.
|
||||
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use std::error::Error;
|
||||
use std::str;
|
||||
|
||||
use {Oid, IsNull, write_nullable, FromUsize};
|
||||
|
||||
const RANGE_UPPER_UNBOUNDED: u8 = 0b0001_0000;
|
||||
const RANGE_LOWER_UNBOUNDED: u8 = 0b0000_1000;
|
||||
const RANGE_UPPER_INCLUSIVE: u8 = 0b0000_0100;
|
||||
const RANGE_LOWER_INCLUSIVE: u8 = 0b0000_0010;
|
||||
const RANGE_EMPTY: u8 = 0b0000_0001;
|
||||
|
||||
/// Serializes a `BOOL` value.
|
||||
#[inline]
|
||||
pub fn bool_to_sql(v: bool, buf: &mut Vec<u8>) {
|
||||
buf.push(v as u8);
|
||||
}
|
||||
|
||||
/// Deserializes a `BOOL` value.
|
||||
#[inline]
|
||||
pub fn bool_from_sql(buf: &[u8]) -> Result<bool, Box<Error + Sync + Send>> {
|
||||
if buf.len() != 1 {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
|
||||
Ok(buf[0] != 0)
|
||||
}
|
||||
|
||||
/// Serializes a `BYTEA` value.
|
||||
#[inline]
|
||||
pub fn bytea_to_sql(v: &[u8], buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(v);
|
||||
}
|
||||
|
||||
/// Deserializes a `BYTEA value.
|
||||
#[inline]
|
||||
pub fn bytea_from_sql(buf: &[u8]) -> &[u8] {
|
||||
buf
|
||||
}
|
||||
|
||||
/// Serializes a `TEXT`, `VARCHAR`, `CHAR(n)`, `NAME`, or `CITEXT` value.
|
||||
#[inline]
|
||||
pub fn text_to_sql(v: &str, buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(v.as_bytes());
|
||||
}
|
||||
|
||||
/// Deserializes a `TEXT`, `VARCHAR`, `CHAR(n)`, `NAME`, or `CITEXT` value.
|
||||
#[inline]
|
||||
pub fn text_from_sql(buf: &[u8]) -> Result<&str, Box<Error + Sync + Send>> {
|
||||
Ok(try!(str::from_utf8(buf)))
|
||||
}
|
||||
|
||||
/// Serializes a `"char"` value.
|
||||
#[inline]
|
||||
pub fn char_to_sql(v: i8, buf: &mut Vec<u8>) {
|
||||
buf.write_i8(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `"char"` value.
|
||||
#[inline]
|
||||
pub fn char_from_sql(mut buf: &[u8]) -> Result<i8, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i8());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes an `INT2` value.
|
||||
#[inline]
|
||||
pub fn int2_to_sql(v: i16, buf: &mut Vec<u8>) {
|
||||
buf.write_i16::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes an `INT2` value.
|
||||
#[inline]
|
||||
pub fn int2_from_sql(mut buf: &[u8]) -> Result<i16, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i16::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes an `INT4` value.
|
||||
#[inline]
|
||||
pub fn int4_to_sql(v: i32, buf: &mut Vec<u8>) {
|
||||
buf.write_i32::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes an `INT4` value.
|
||||
#[inline]
|
||||
pub fn int4_from_sql(mut buf: &[u8]) -> Result<i32, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i32::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes an `OID` value.
|
||||
#[inline]
|
||||
pub fn oid_to_sql(v: Oid, buf: &mut Vec<u8>) {
|
||||
buf.write_u32::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes an `OID` value.
|
||||
#[inline]
|
||||
pub fn oid_from_sql(mut buf: &[u8]) -> Result<Oid, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_u32::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes an `INT8` value.
|
||||
#[inline]
|
||||
pub fn int8_to_sql(v: i64, buf: &mut Vec<u8>) {
|
||||
buf.write_i64::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes an `INT8` value.
|
||||
#[inline]
|
||||
pub fn int8_from_sql(mut buf: &[u8]) -> Result<i64, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i64::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes a `FLOAT4` value.
|
||||
#[inline]
|
||||
pub fn float4_to_sql(v: f32, buf: &mut Vec<u8>) {
|
||||
buf.write_f32::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `FLOAT4` value.
|
||||
#[inline]
|
||||
pub fn float4_from_sql(mut buf: &[u8]) -> Result<f32, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_f32::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes a `FLOAT8` value.
|
||||
#[inline]
|
||||
pub fn float8_to_sql(v: f64, buf: &mut Vec<u8>) {
|
||||
buf.write_f64::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `FLOAT8` value.
|
||||
#[inline]
|
||||
pub fn float8_from_sql(mut buf: &[u8]) -> Result<f64, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_f64::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes an `HSTORE` value.
|
||||
#[inline]
|
||||
pub fn hstore_to_sql<'a, I>(values: I, buf: &mut Vec<u8>) -> Result<(), Box<Error + Sync + Send>>
|
||||
where I: IntoIterator<Item = (&'a str, Option<&'a str>)>
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
|
||||
let mut count = 0;
|
||||
for (key, value) in values {
|
||||
count += 1;
|
||||
|
||||
try!(write_pascal_string(key, buf));
|
||||
|
||||
match value {
|
||||
Some(value) => {
|
||||
try!(write_pascal_string(value, buf));
|
||||
}
|
||||
None => buf.write_i32::<BigEndian>(-1).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
let count = try!(i32::from_usize(count));
|
||||
(&mut buf[base..base + 4]).write_i32::<BigEndian>(count).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_pascal_string(s: &str, buf: &mut Vec<u8>) -> Result<(), Box<Error + Sync + Send>> {
|
||||
let size = try!(i32::from_usize(s.len()));
|
||||
buf.write_i32::<BigEndian>(size).unwrap();
|
||||
buf.extend_from_slice(s.as_bytes());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserializes an `HSTORE` value.
|
||||
#[inline]
|
||||
pub fn hstore_from_sql<'a>(mut buf: &'a [u8])
|
||||
-> Result<HstoreEntries<'a>, Box<Error + Sync + Send>> {
|
||||
let count = try!(buf.read_i32::<BigEndian>());
|
||||
if count < 0 {
|
||||
return Err("invalid entry count".into());
|
||||
}
|
||||
|
||||
Ok(HstoreEntries {
|
||||
remaining: count,
|
||||
buf: buf,
|
||||
})
|
||||
}
|
||||
|
||||
/// A fallible iterator over `HSTORE` entries.
|
||||
pub struct HstoreEntries<'a> {
|
||||
remaining: i32,
|
||||
buf: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for HstoreEntries<'a> {
|
||||
type Item = (&'a str, Option<&'a str>);
|
||||
type Error = Box<Error + Sync + Send>;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Result<Option<(&'a str, Option<&'a str>)>, Box<Error + Sync + Send>> {
|
||||
if self.remaining == 0 {
|
||||
if !self.buf.is_empty() {
|
||||
return Err("invalid buffer size".into());
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
|
||||
let key_len = try!(self.buf.read_i32::<BigEndian>());
|
||||
if key_len < 0 {
|
||||
return Err("invalid key length".into());
|
||||
}
|
||||
let (key, buf) = self.buf.split_at(key_len as usize);
|
||||
let key = try!(str::from_utf8(key));
|
||||
self.buf = buf;
|
||||
|
||||
let value_len = try!(self.buf.read_i32::<BigEndian>());
|
||||
let value = if value_len < 0 {
|
||||
None
|
||||
} else {
|
||||
let (value, buf) = self.buf.split_at(value_len as usize);
|
||||
let value = try!(str::from_utf8(value));
|
||||
self.buf = buf;
|
||||
Some(value)
|
||||
};
|
||||
|
||||
Ok(Some((key, value)))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.remaining as usize;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes a `VARBIT` or `BIT` value.
|
||||
#[inline]
|
||||
pub fn varbit_to_sql<I>(len: usize, v: I, buf: &mut Vec<u8>) -> Result<(), Box<Error + Sync + Send>>
|
||||
where I: Iterator<Item = u8>
|
||||
{
|
||||
let len = try!(i32::from_usize(len));
|
||||
buf.write_i32::<BigEndian>(len).unwrap();
|
||||
|
||||
for byte in v {
|
||||
buf.push(byte);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserializes a `VARBIT` or `BIT` value.
|
||||
#[inline]
|
||||
pub fn varbit_from_sql<'a>(mut buf: &'a [u8]) -> Result<Varbit<'a>, Box<Error + Sync + Send>> {
|
||||
let len = try!(buf.read_i32::<BigEndian>());
|
||||
if len < 0 {
|
||||
return Err("invalid varbit length".into());
|
||||
}
|
||||
let bytes = (len as usize + 7) / 8;
|
||||
if buf.len() != bytes {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
|
||||
Ok(Varbit {
|
||||
len: len as usize,
|
||||
bytes: buf,
|
||||
})
|
||||
}
|
||||
|
||||
/// A `VARBIT` value.
|
||||
pub struct Varbit<'a> {
|
||||
len: usize,
|
||||
bytes: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> Varbit<'a> {
|
||||
/// Returns the number of bits.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
|
||||
/// Returns the bits as a slice of bytes.
|
||||
#[inline]
|
||||
pub fn bytes(&self) -> &'a [u8] {
|
||||
self.bytes
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes a `TIMESTAMP` or `TIMESTAMPTZ` value.
|
||||
///
|
||||
/// The value should represent the number of microseconds since midnight, January 1st, 2000.
|
||||
#[inline]
|
||||
pub fn timestamp_to_sql(v: i64, buf: &mut Vec<u8>) {
|
||||
buf.write_i64::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `TIMESTAMP` or `TIMESTAMPTZ` value.
|
||||
///
|
||||
/// The value represents the number of microseconds since midnight, January 1st, 2000.
|
||||
#[inline]
|
||||
pub fn timestamp_from_sql(mut buf: &[u8]) -> Result<i64, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i64::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes a `DATE` value.
|
||||
///
|
||||
/// The value should represent the number of days since January 1st, 2000.
|
||||
#[inline]
|
||||
pub fn date_to_sql(v: i32, buf: &mut Vec<u8>) {
|
||||
buf.write_i32::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `DATE` value.
|
||||
///
|
||||
/// The value represents the number of days since January 1st, 2000.
|
||||
#[inline]
|
||||
pub fn date_from_sql(mut buf: &[u8]) -> Result<i32, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i32::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes a `TIME` or `TIMETZ` value.
|
||||
///
|
||||
/// The value should represent the number of microseconds since midnight.
|
||||
#[inline]
|
||||
pub fn time_to_sql(v: i64, buf: &mut Vec<u8>) {
|
||||
buf.write_i64::<BigEndian>(v).unwrap();
|
||||
}
|
||||
|
||||
/// Deserializes a `TIME` or `TIMETZ` value.
|
||||
///
|
||||
/// The value represents the number of microseconds since midnight.
|
||||
#[inline]
|
||||
pub fn time_from_sql(mut buf: &[u8]) -> Result<i64, Box<Error + Sync + Send>> {
|
||||
let v = try!(buf.read_i64::<BigEndian>());
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// Serializes a `MACADDR` value.
|
||||
#[inline]
|
||||
pub fn macaddr_to_sql(v: [u8; 6], buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(&v);
|
||||
}
|
||||
|
||||
/// Deserializes a `MACADDR` value.
|
||||
#[inline]
|
||||
pub fn macaddr_from_sql(buf: &[u8]) -> Result<[u8; 6], Box<Error + Sync + Send>> {
|
||||
if buf.len() != 6 {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
let mut out = [0; 6];
|
||||
out.copy_from_slice(buf);
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Serializes a `UUID` value.
|
||||
#[inline]
|
||||
pub fn uuid_to_sql(v: [u8; 16], buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(&v);
|
||||
}
|
||||
|
||||
/// Deserializes a `UUID` value.
|
||||
#[inline]
|
||||
pub fn uuid_from_sql(buf: &[u8]) -> Result<[u8; 16], Box<Error + Sync + Send>> {
|
||||
if buf.len() != 16 {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
let mut out = [0; 16];
|
||||
out.copy_from_slice(buf);
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Serializes an array value.
|
||||
#[inline]
|
||||
pub fn array_to_sql<T, I, J, F>(dimensions: I,
|
||||
has_nulls: bool,
|
||||
element_type: Oid,
|
||||
elements: J,
|
||||
mut serializer: F,
|
||||
buf: &mut Vec<u8>)
|
||||
-> Result<(), Box<Error + Sync + Send>>
|
||||
where I: IntoIterator<Item = ArrayDimension>,
|
||||
J: IntoIterator<Item = T>,
|
||||
F: FnMut(T, &mut Vec<u8>) -> Result<IsNull, Box<Error + Sync + Send>>
|
||||
{
|
||||
let dimensions_idx = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
buf.write_i32::<BigEndian>(has_nulls as i32).unwrap();
|
||||
buf.write_u32::<BigEndian>(element_type).unwrap();
|
||||
|
||||
let mut num_dimensions = 0;
|
||||
for dimension in dimensions {
|
||||
num_dimensions += 1;
|
||||
buf.write_i32::<BigEndian>(dimension.len).unwrap();
|
||||
buf.write_i32::<BigEndian>(dimension.lower_bound).unwrap();
|
||||
}
|
||||
|
||||
let num_dimensions = try!(i32::from_usize(num_dimensions));
|
||||
(&mut buf[dimensions_idx..dimensions_idx + 4])
|
||||
.write_i32::<BigEndian>(num_dimensions)
|
||||
.unwrap();
|
||||
|
||||
for element in elements {
|
||||
try!(write_nullable(|buf| serializer(element, buf), buf));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserializes an array value.
|
||||
#[inline]
|
||||
pub fn array_from_sql<'a>(mut buf: &'a [u8]) -> Result<Array<'a>, Box<Error + Sync + Send>> {
|
||||
let dimensions = try!(buf.read_i32::<BigEndian>());
|
||||
if dimensions < 0 {
|
||||
return Err("invalid dimension count".into());
|
||||
}
|
||||
let has_nulls = try!(buf.read_i32::<BigEndian>()) != 0;
|
||||
let element_type = try!(buf.read_u32::<BigEndian>());
|
||||
|
||||
let mut r = buf;
|
||||
let mut elements = 1i32;
|
||||
for _ in 0..dimensions {
|
||||
let len = try!(r.read_i32::<BigEndian>());
|
||||
if len < 0 {
|
||||
return Err("invalid dimension size".into());
|
||||
}
|
||||
let _lower_bound = try!(r.read_i32::<BigEndian>());
|
||||
elements = match elements.checked_mul(len) {
|
||||
Some(elements) => elements,
|
||||
None => return Err("too many array elements".into()),
|
||||
};
|
||||
}
|
||||
|
||||
if dimensions == 0 {
|
||||
elements = 0;
|
||||
}
|
||||
|
||||
Ok(Array {
|
||||
dimensions: dimensions,
|
||||
has_nulls: has_nulls,
|
||||
element_type: element_type,
|
||||
elements: elements,
|
||||
buf: buf,
|
||||
})
|
||||
}
|
||||
|
||||
/// A Postgres array.
|
||||
pub struct Array<'a> {
|
||||
dimensions: i32,
|
||||
has_nulls: bool,
|
||||
element_type: Oid,
|
||||
elements: i32,
|
||||
buf: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> Array<'a> {
|
||||
/// Returns true if there are `NULL` elements.
|
||||
#[inline]
|
||||
pub fn has_nulls(&self) -> bool {
|
||||
self.has_nulls
|
||||
}
|
||||
|
||||
/// Returns the OID of the elements of the array.
|
||||
#[inline]
|
||||
pub fn element_type(&self) -> Oid {
|
||||
self.element_type
|
||||
}
|
||||
|
||||
/// Returns an iterator over the dimensions of the array.
|
||||
#[inline]
|
||||
pub fn dimensions(&self) -> ArrayDimensions<'a> {
|
||||
ArrayDimensions(&self.buf[..self.dimensions as usize * 8])
|
||||
}
|
||||
|
||||
/// Returns an iterator over the values of the array.
|
||||
#[inline]
|
||||
pub fn values(&self) -> ArrayValues<'a> {
|
||||
ArrayValues {
|
||||
remaining: self.elements,
|
||||
buf: &self.buf[self.dimensions as usize * 8..],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over the dimensions of an array.
|
||||
pub struct ArrayDimensions<'a>(&'a [u8]);
|
||||
|
||||
impl<'a> FallibleIterator for ArrayDimensions<'a> {
|
||||
type Item = ArrayDimension;
|
||||
type Error = Box<Error + Sync + Send>;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Result<Option<ArrayDimension>, Box<Error + Sync + Send>> {
|
||||
if self.0.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = try!(self.0.read_i32::<BigEndian>());
|
||||
let lower_bound = try!(self.0.read_i32::<BigEndian>());
|
||||
|
||||
Ok(Some(ArrayDimension {
|
||||
len: len,
|
||||
lower_bound: lower_bound,
|
||||
}))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.0.len() / 8;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a dimension of an array.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub struct ArrayDimension {
|
||||
/// The length of this dimension.
|
||||
pub len: i32,
|
||||
|
||||
/// The base value used to index into this dimension.
|
||||
pub lower_bound: i32,
|
||||
}
|
||||
|
||||
/// An iterator over the values of an array, in row-major order.
|
||||
pub struct ArrayValues<'a> {
|
||||
remaining: i32,
|
||||
buf: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> FallibleIterator for ArrayValues<'a> {
|
||||
type Item = Option<&'a [u8]>;
|
||||
type Error = Box<Error + Sync + Send>;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Result<Option<Option<&'a [u8]>>, Box<Error + Sync + Send>> {
|
||||
if self.remaining == 0 {
|
||||
if !self.buf.is_empty() {
|
||||
return Err("invalid message length".into());
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
self.remaining -= 1;
|
||||
|
||||
let len = try!(self.buf.read_i32::<BigEndian>());
|
||||
let val = if len < 0 {
|
||||
None
|
||||
} else {
|
||||
if self.buf.len() < len as usize {
|
||||
return Err("invalid value length".into());
|
||||
}
|
||||
|
||||
let (val, buf) = self.buf.split_at(len as usize);
|
||||
self.buf = buf;
|
||||
Some(val)
|
||||
};
|
||||
|
||||
Ok(Some(val))
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let len = self.remaining as usize;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes an empty range.
|
||||
#[inline]
|
||||
pub fn empty_range_to_sql(buf: &mut Vec<u8>) {
|
||||
buf.push(RANGE_EMPTY);
|
||||
}
|
||||
|
||||
/// Serializes a range value.
|
||||
pub fn range_to_sql<F, G>(lower: F,
|
||||
upper: G,
|
||||
buf: &mut Vec<u8>)
|
||||
-> Result<(), Box<Error + Sync + Send>>
|
||||
where F: FnOnce(&mut Vec<u8>) -> Result<RangeBound<IsNull>, Box<Error + Sync + Send>>,
|
||||
G: FnOnce(&mut Vec<u8>) -> Result<RangeBound<IsNull>, Box<Error + Sync + Send>>
|
||||
{
|
||||
let tag_idx = buf.len();
|
||||
buf.push(0);
|
||||
let mut tag = 0;
|
||||
|
||||
match try!(write_bound(lower, buf)) {
|
||||
RangeBound::Inclusive(()) => tag |= RANGE_LOWER_INCLUSIVE,
|
||||
RangeBound::Exclusive(()) => {}
|
||||
RangeBound::Unbounded => tag |= RANGE_LOWER_UNBOUNDED,
|
||||
}
|
||||
|
||||
match try!(write_bound(upper, buf)) {
|
||||
RangeBound::Inclusive(()) => tag |= RANGE_UPPER_INCLUSIVE,
|
||||
RangeBound::Exclusive(()) => {}
|
||||
RangeBound::Unbounded => tag |= RANGE_UPPER_UNBOUNDED,
|
||||
}
|
||||
|
||||
buf[tag_idx] = tag;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_bound<F>(bound: F, buf: &mut Vec<u8>) -> Result<RangeBound<()>, Box<Error + Sync + Send>>
|
||||
where F: FnOnce(&mut Vec<u8>) -> Result<RangeBound<IsNull>, Box<Error + Sync + Send>>
|
||||
{
|
||||
let base = buf.len();
|
||||
buf.extend_from_slice(&[0; 4]);
|
||||
|
||||
let (null, ret) = match try!(bound(buf)) {
|
||||
RangeBound::Inclusive(null) => (Some(null), RangeBound::Inclusive(())),
|
||||
RangeBound::Exclusive(null) => (Some(null), RangeBound::Exclusive(())),
|
||||
RangeBound::Unbounded => (None, RangeBound::Unbounded),
|
||||
};
|
||||
|
||||
match null {
|
||||
Some(null) => {
|
||||
let len = match null {
|
||||
IsNull::No => try!(i32::from_usize(buf.len() - base - 4)),
|
||||
IsNull::Yes => -1,
|
||||
};
|
||||
(&mut buf[base..base + 4]).write_i32::<BigEndian>(len).unwrap();
|
||||
}
|
||||
None => buf.truncate(base),
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// One side of a range.
|
||||
pub enum RangeBound<T> {
|
||||
/// An inclusive bound.
|
||||
Inclusive(T),
|
||||
/// An exclusive bound.
|
||||
Exclusive(T),
|
||||
/// No bound.
|
||||
Unbounded,
|
||||
}
|
||||
|
||||
/// Deserializes a range value.
|
||||
#[inline]
|
||||
pub fn range_from_sql<'a>(mut buf: &'a [u8]) -> Result<Range<'a>, Box<Error + Sync + Send>> {
|
||||
let tag = try!(buf.read_u8());
|
||||
|
||||
if tag == RANGE_EMPTY {
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid message size".into());
|
||||
}
|
||||
return Ok(Range::Empty);
|
||||
}
|
||||
|
||||
let lower = try!(read_bound(&mut buf, tag, RANGE_LOWER_UNBOUNDED, RANGE_LOWER_INCLUSIVE));
|
||||
let upper = try!(read_bound(&mut buf, tag, RANGE_UPPER_UNBOUNDED, RANGE_UPPER_INCLUSIVE));
|
||||
|
||||
if !buf.is_empty() {
|
||||
return Err("invalid message size".into());
|
||||
}
|
||||
|
||||
Ok(Range::Nonempty(lower, upper))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_bound<'a>(buf: &mut &'a [u8],
|
||||
tag: u8,
|
||||
unbounded: u8,
|
||||
inclusive: u8)
|
||||
-> Result<RangeBound<Option<&'a [u8]>>, Box<Error + Sync + Send>> {
|
||||
if tag & unbounded != 0 {
|
||||
Ok(RangeBound::Unbounded)
|
||||
} else {
|
||||
let len = try!(buf.read_i32::<BigEndian>());
|
||||
let value = if len < 0 {
|
||||
None
|
||||
} else {
|
||||
let len = len as usize;
|
||||
if buf.len() < len {
|
||||
return Err("invalid message size".into());
|
||||
}
|
||||
let (value, tail) = buf.split_at(len);
|
||||
*buf = tail;
|
||||
Some(value)
|
||||
};
|
||||
|
||||
if tag & inclusive != 0 {
|
||||
Ok(RangeBound::Inclusive(value))
|
||||
} else {
|
||||
Ok(RangeBound::Exclusive(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Postgres range.
|
||||
pub enum Range<'a> {
|
||||
/// An empty range.
|
||||
Empty,
|
||||
/// A nonempty range.
|
||||
Nonempty(RangeBound<Option<&'a [u8]>>, RangeBound<Option<&'a [u8]>>),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashMap;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
|
||||
use super::*;
|
||||
use IsNull;
|
||||
|
||||
#[test]
|
||||
fn bool() {
|
||||
let mut buf = vec![];
|
||||
bool_to_sql(true, &mut buf);
|
||||
assert_eq!(bool_from_sql(&buf).unwrap(), true);
|
||||
|
||||
let mut buf = vec![];
|
||||
bool_to_sql(false, &mut buf);
|
||||
assert_eq!(bool_from_sql(&buf).unwrap(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn int2() {
|
||||
let mut buf = vec![];
|
||||
int2_to_sql(0x0102, &mut buf);
|
||||
assert_eq!(int2_from_sql(&buf).unwrap(), 0x0102);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn int4() {
|
||||
let mut buf = vec![];
|
||||
int4_to_sql(0x01020304, &mut buf);
|
||||
assert_eq!(int4_from_sql(&buf).unwrap(), 0x01020304);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn int8() {
|
||||
let mut buf = vec![];
|
||||
int8_to_sql(0x0102030405060708, &mut buf);
|
||||
assert_eq!(int8_from_sql(&buf).unwrap(), 0x0102030405060708);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn float4() {
|
||||
let mut buf = vec![];
|
||||
float4_to_sql(10343.95, &mut buf);
|
||||
assert_eq!(float4_from_sql(&buf).unwrap(), 10343.95);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn float8() {
|
||||
let mut buf = vec![];
|
||||
float8_to_sql(10343.95, &mut buf);
|
||||
assert_eq!(float8_from_sql(&buf).unwrap(), 10343.95);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hstore() {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("hello", Some("world"));
|
||||
map.insert("hola", None);
|
||||
|
||||
let mut buf = vec![];
|
||||
hstore_to_sql(map.iter().map(|(&k, &v)| (k, v)), &mut buf).unwrap();
|
||||
assert_eq!(hstore_from_sql(&buf).unwrap().collect::<HashMap<_, _>>().unwrap(),
|
||||
map);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn varbit() {
|
||||
let len = 12;
|
||||
let bits = [0b0010_1011, 0b0000_1111];
|
||||
|
||||
let mut buf = vec![];
|
||||
varbit_to_sql(len, bits.iter().cloned(), &mut buf).unwrap();
|
||||
let out = varbit_from_sql(&buf).unwrap();
|
||||
assert_eq!(out.len(), len);
|
||||
assert_eq!(out.bytes(), bits);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn array() {
|
||||
let dimensions = [ArrayDimension {
|
||||
len: 1,
|
||||
lower_bound: 10,
|
||||
},
|
||||
ArrayDimension {
|
||||
len: 2,
|
||||
lower_bound: 0,
|
||||
}];
|
||||
let values = [None, Some(&b"hello"[..])];
|
||||
|
||||
let mut buf = vec![];
|
||||
array_to_sql(dimensions.iter().cloned(),
|
||||
true,
|
||||
10,
|
||||
values.iter().cloned(),
|
||||
|v, buf| {
|
||||
match v {
|
||||
Some(v) => {
|
||||
buf.extend_from_slice(v);
|
||||
Ok(IsNull::No)
|
||||
}
|
||||
None => Ok(IsNull::Yes),
|
||||
}
|
||||
},
|
||||
&mut buf)
|
||||
.unwrap();
|
||||
|
||||
let array = array_from_sql(&buf).unwrap();
|
||||
assert_eq!(array.has_nulls(), true);
|
||||
assert_eq!(array.element_type(), 10);
|
||||
assert_eq!(array.dimensions().collect::<Vec<_>>().unwrap(), dimensions);
|
||||
assert_eq!(array.values().collect::<Vec<_>>().unwrap(), values);
|
||||
}
|
||||
}
|
@ -20,7 +20,7 @@ with-uuid = ["uuid"]
|
||||
hex = "0.2"
|
||||
fallible-iterator = "0.1.3"
|
||||
phf = "=0.7.21"
|
||||
postgres-protocol = "0.2"
|
||||
postgres-protocol = { version = "0.2", path = "../postgres-protocol" }
|
||||
|
||||
bit-vec = { version = "0.4", optional = true }
|
||||
chrono = { version = "0.3", optional = true }
|
||||
|
@ -42,7 +42,6 @@ bufstream = "0.1"
|
||||
fallible-iterator = "0.1.3"
|
||||
hex = "0.2"
|
||||
log = "0.3"
|
||||
postgres-protocol = "0.2"
|
||||
|
||||
openssl = { version = "0.9.2", optional = true }
|
||||
native-tls = { version = "0.1", optional = true }
|
||||
@ -50,6 +49,7 @@ rustc-serialize = { version = "0.3", optional = true }
|
||||
schannel = { version = "0.1", optional = true }
|
||||
security-framework = { version = "0.1.2", optional = true }
|
||||
|
||||
postgres-protocol = { version = "0.2", path = "../postgres-protocol" }
|
||||
postgres-shared = { version = "0.2", path = "../postgres-shared" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -25,8 +25,8 @@ with-openssl = ["tokio-openssl", "openssl"]
|
||||
fallible-iterator = "0.1.3"
|
||||
futures = "0.1.7"
|
||||
futures-state-stream = "0.1"
|
||||
postgres-protocol = { version = "0.2", path = "../postgres-protocol" }
|
||||
postgres-shared = { version = "0.2", path = "../postgres-shared" }
|
||||
postgres-protocol = "0.2"
|
||||
tokio-core = "0.1"
|
||||
tokio-dns-unofficial = "0.1"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user