Merge latest master into branch

This commit is contained in:
Colin Maxfield 2020-01-02 10:24:19 -05:00
commit 7ff4b3a2e9
34 changed files with 486 additions and 96 deletions

View File

@ -22,7 +22,7 @@ version: 2
jobs: jobs:
build: build:
docker: docker:
- image: rust:1.39.0 - image: rust:1.40.0
environment: environment:
RUSTFLAGS: -D warnings RUSTFLAGS: -D warnings
- image: sfackler/rust-postgres-test:6 - image: sfackler/rust-postgres-test:6

View File

@ -1,5 +1,9 @@
# Change Log # Change Log
## v0.1.0-alpha.1 - 2019-10-14 ## v0.4.0 - 2019-12-23
No changes
## v0.4.0-alpha.1 - 2019-10-14
* Initial release * Initial release

View File

@ -1,6 +1,6 @@
[package] [package]
name = "postgres-derive" name = "postgres-derive"
version = "0.4.0-alpha.1" version = "0.4.0"
authors = ["Steven Fackler <sfackler@palantir.com>"] authors = ["Steven Fackler <sfackler@palantir.com>"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
edition = "2018" edition = "2018"

View File

@ -1,5 +1,11 @@
# Change Log # Change Log
## v0.3.0 - 2019-12-23
### Changed
* Upgraded to `tokio-postgres` 0.5.
## v0.3.0-alpha.2 - 2019-11-27 ## v0.3.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "postgres-native-tls" name = "postgres-native-tls"
version = "0.3.0-alpha.2" version = "0.3.0"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -21,8 +21,8 @@ futures = "0.3"
native-tls = "0.2" native-tls = "0.2"
tokio = "0.2" tokio = "0.2"
tokio-tls = "0.3" tokio-tls = "0.3"
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false } tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
postgres = { version = "=0.17.0-alpha.2", path = "../postgres" } postgres = { version = "0.17.0", path = "../postgres" }

View File

@ -1,5 +1,11 @@
# Change Log # Change Log
## v0.3.0 - 2019-12-23
### Changed
* Upgraded to `tokio-postgres` 0.5.
## v0.3.0-alpha.2 - 2019-11-27 ## v0.3.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "postgres-openssl" name = "postgres-openssl"
version = "0.3.0-alpha.2" version = "0.3.0"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -21,8 +21,8 @@ futures = "0.3"
openssl = "0.10" openssl = "0.10"
tokio = "0.2" tokio = "0.2"
tokio-openssl = "0.4" tokio-openssl = "0.4"
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false } tokio-postgres = { version = "0.5.0", path = "../tokio-postgres", default-features = false }
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
postgres = { version = "=0.17.0-alpha.2", path = "../postgres" } postgres = { version = "0.17.0", path = "../postgres" }

View File

@ -1,5 +1,11 @@
# Change Log # Change Log
## v0.5.0 - 2019-12-23
### Changed
* `frontend::Message` is now a true non-exhaustive enum.
## v0.5.0-alpha.2 - 2019-11-27 ## v0.5.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,21 +1,20 @@
[package] [package]
name = "postgres-protocol" name = "postgres-protocol"
version = "0.5.0-alpha.2" version = "0.5.0"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
description = "Low level Postgres protocol APIs" description = "Low level Postgres protocol APIs"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
repository = "https://github.com/sfackler/rust-postgres-protocol" repository = "https://github.com/sfackler/rust-postgres"
readme = "../README.md" readme = "../README.md"
[dependencies] [dependencies]
base64 = "0.10" base64 = "0.11"
byteorder = "1.0" byteorder = "1.0"
bytes = "0.5" bytes = "0.5"
fallible-iterator = "0.2" fallible-iterator = "0.2"
generic-array = "0.13"
hmac = "0.7" hmac = "0.7"
md5 = "0.6" md5 = "0.7"
memchr = "2.0" memchr = "2.0"
rand = "0.7" rand = "0.7"
sha2 = "0.8" sha2 = "0.8"

View File

@ -1,7 +1,5 @@
//! SASL-based authentication support. //! SASL-based authentication support.
use generic_array::typenum::U32;
use generic_array::GenericArray;
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use rand::{self, Rng}; use rand::{self, Rng};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -33,13 +31,13 @@ fn normalize(pass: &[u8]) -> Vec<u8> {
} }
} }
fn hi(str: &[u8], salt: &[u8], i: u32) -> GenericArray<u8, U32> { fn hi(str: &[u8], salt: &[u8], i: u32) -> [u8; 32] {
let mut hmac = Hmac::<Sha256>::new_varkey(str).expect("HMAC is able to accept all key sizes"); let mut hmac = Hmac::<Sha256>::new_varkey(str).expect("HMAC is able to accept all key sizes");
hmac.input(salt); hmac.input(salt);
hmac.input(&[0, 0, 0, 1]); hmac.input(&[0, 0, 0, 1]);
let mut prev = hmac.result().code(); let mut prev = hmac.result().code();
let mut hi = GenericArray::<u8, U32>::clone_from_slice(&prev); let mut hi = prev;
for _ in 1..i { for _ in 1..i {
let mut hmac = Hmac::<Sha256>::new_varkey(str).expect("already checked above"); let mut hmac = Hmac::<Sha256>::new_varkey(str).expect("already checked above");
@ -51,7 +49,7 @@ fn hi(str: &[u8], salt: &[u8], i: u32) -> GenericArray<u8, U32> {
} }
} }
hi hi.into()
} }
enum ChannelBindingInner { enum ChannelBindingInner {
@ -103,7 +101,7 @@ enum State {
channel_binding: ChannelBinding, channel_binding: ChannelBinding,
}, },
Finish { Finish {
salted_password: GenericArray<u8, U32>, salted_password: [u8; 32],
auth_message: String, auth_message: String,
}, },
Done, Done,
@ -220,7 +218,7 @@ impl ScramSha256 {
hmac.input(auth_message.as_bytes()); hmac.input(auth_message.as_bytes());
let client_signature = hmac.result(); let client_signature = hmac.result();
let mut client_proof = GenericArray::<u8, U32>::clone_from_slice(&client_key); let mut client_proof = client_key;
for (proof, signature) in client_proof.iter_mut().zip(client_signature.code()) { for (proof, signature) in client_proof.iter_mut().zip(client_signature.code()) {
*proof ^= signature; *proof ^= signature;
} }

View File

@ -72,6 +72,7 @@ impl Header {
} }
/// An enum representing Postgres backend messages. /// An enum representing Postgres backend messages.
#[non_exhaustive]
pub enum Message { pub enum Message {
AuthenticationCleartextPassword, AuthenticationCleartextPassword,
AuthenticationGss, AuthenticationGss,
@ -104,8 +105,6 @@ pub enum Message {
PortalSuspended, PortalSuspended,
ReadyForQuery(ReadyForQueryBody), ReadyForQuery(ReadyForQueryBody),
RowDescription(RowDescriptionBody), RowDescription(RowDescriptionBody),
#[doc(hidden)]
__ForExtensibility,
} }
impl Message { impl Message {

View File

@ -1,5 +1,19 @@
# Change Log # Change Log
## v0.1.0 - 2019-12-23
### Changed
* `Kind` is now a true non-exhaustive enum.
### Removed
* Removed `uuid` 0.7 support.
### Added
* Added a `Hash` implementation for `Type`.
## v0.1.0-alpha.2 - 2019-11-27 ## v0.1.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "postgres-types" name = "postgres-types"
version = "0.1.0-alpha.2" version = "0.1.0"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -22,8 +22,8 @@ with-uuid-0_8 = ["uuid-08"]
[dependencies] [dependencies]
bytes = "0.5" bytes = "0.5"
fallible-iterator = "0.2" fallible-iterator = "0.2"
postgres-protocol = { version = "=0.5.0-alpha.2", path = "../postgres-protocol" } postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-derive = { version = "=0.4.0-alpha.1", optional = true, path = "../postgres-derive" } postgres-derive = { version = "0.4.0", optional = true, path = "../postgres-derive" }
bit-vec-06 = { version = "0.6", package = "bit-vec", optional = true } bit-vec-06 = { version = "0.6", package = "bit-vec", optional = true }
chrono-04 = { version = "0.4", package = "chrono", optional = true } chrono-04 = { version = "0.4", package = "chrono", optional = true }

View File

@ -263,6 +263,7 @@ impl Type {
/// Represents the kind of a Postgres type. /// Represents the kind of a Postgres type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum Kind { pub enum Kind {
/// A simple type like `VARCHAR` or `INTEGER`. /// A simple type like `VARCHAR` or `INTEGER`.
Simple, Simple,
@ -278,8 +279,6 @@ pub enum Kind {
Domain(Type), Domain(Type),
/// A composite type along with information about its fields. /// A composite type along with information about its fields.
Composite(Vec<Field>), Composite(Vec<Field>),
#[doc(hidden)]
__ForExtensibility,
} }
/// Information about a field of a composite type. /// Information about a field of a composite type.

View File

@ -1,5 +1,26 @@
# Change Log # Change Log
## v0.17.0 - 2019-12-23
### Changed
* Each `Client` now has its own non-threaded tokio `Runtime` rather than sharing a global threaded `Runtime`. This
significantly improves performance by minimizing context switches and cross-thread synchronization.
* `Client::copy_in` now returns a writer rather than taking in a reader.
* `Client::query_raw` now returns a named type.
* `Client::copy_in` and `Client::copy_out` no longer take query parameters as PostgreSQL doesn't support them in COPY
queries.
### Removed
* Removed support for `uuid` 0.7.
### Added
* Added `Client::query_opt` for queries that are expected to return zero or one rows.
* Added binary copy support in the `binary_copy` module.
* The `fallible-iterator` crate is now publicly reexported.
## v0.17.0-alpha.2 - 2019-11-27 ## v0.17.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "postgres" name = "postgres"
version = "0.17.0-alpha.2" version = "0.17.0"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -32,11 +32,10 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
bytes = "0.5" bytes = "0.5"
fallible-iterator = "0.2" fallible-iterator = "0.2"
futures = "0.3" futures = "0.3"
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres" } tokio-postgres = { version = "0.5.0", path = "../tokio-postgres" }
tokio = { version = "0.2", features = ["rt-core"] } tokio = { version = "0.2", features = ["rt-core"] }
log = "0.4" log = "0.4"
[dev-dependencies] [dev-dependencies]
criterion = "0.3" criterion = "0.3"
tokio = "0.2"

View File

@ -0,0 +1,36 @@
use tokio::runtime;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::{Error, Socket};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone)]
pub struct CancelToken(tokio_postgres::CancelToken);
impl CancelToken {
pub(crate) fn new(inner: tokio_postgres::CancelToken) -> CancelToken {
CancelToken(inner)
}
/// Attempts to cancel the in-progress query on the connection associated
/// with this `CancelToken`.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
/// only be returned if the client was unable to connect to the database.
///
/// Cancellation is inherently racy. There is no guarantee that the
/// cancellation request will reach the server before the query terminates
/// normally, or that the connection associated with this token is still
/// active.
pub fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.unwrap() // FIXME don't unwrap
.block_on(self.0.cancel_query(tls))
}
}

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
Config, CopyInWriter, CopyOutReader, GenericConnection, RowIter, Statement, ToStatement, CancelToken, Config, CopyInWriter, CopyOutReader, GenericConnection, RowIter, Statement,
Transaction, ToStatement, Transaction,
}; };
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
@ -446,6 +446,46 @@ impl Client {
Ok(Transaction::new(&mut self.runtime, transaction)) Ok(Transaction::new(&mut self.runtime, transaction))
} }
/// Constructs a cancellation token that can later be used to request
/// cancellation of a query running on this connection.
///
/// # Examples
///
/// ```no_run
/// use postgres::{Client, NoTls};
/// use postgres::error::SqlState;
/// use std::thread;
/// use std::time::Duration;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let cancel_token = client.cancel_token();
///
/// thread::spawn(move || {
/// // Abort the query after 5s.
/// thread::sleep(Duration::from_secs(5));
/// cancel_token.cancel_query(NoTls);
/// });
///
/// match client.simple_query("SELECT long_running_query()") {
/// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
/// // Handle canceled query.
/// }
/// Err(err) => return Err(err.into()),
/// Ok(rows) => {
/// // ...
/// }
/// }
/// // ...
///
/// # Ok(())
/// # }
/// ```
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.client.cancel_token())
}
/// Determines if the client's connection has already closed. /// Determines if the client's connection has already closed.
/// ///
/// If this returns `true`, the client is no longer usable. /// If this returns `true`, the client is no longer usable.

View File

@ -11,7 +11,7 @@ use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use tokio::runtime; use tokio::runtime;
#[doc(inline)] #[doc(inline)]
pub use tokio_postgres::config::{ChannelBinding, SslMode, TargetSessionAttrs}; pub use tokio_postgres::config::{ChannelBinding, Host, SslMode, TargetSessionAttrs};
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::{Error, Socket}; use tokio_postgres::{Error, Socket};
@ -123,6 +123,12 @@ impl Config {
self self
} }
/// Gets the user to authenticate with, if one has been configured with
/// the `user` method.
pub fn get_user(&self) -> Option<&str> {
self.config.get_user()
}
/// Sets the password to authenticate with. /// Sets the password to authenticate with.
pub fn password<T>(&mut self, password: T) -> &mut Config pub fn password<T>(&mut self, password: T) -> &mut Config
where where
@ -132,6 +138,12 @@ impl Config {
self self
} }
/// Gets the password to authenticate with, if one has been configured with
/// the `password` method.
pub fn get_password(&self) -> Option<&[u8]> {
self.config.get_password()
}
/// Sets the name of the database to connect to. /// Sets the name of the database to connect to.
/// ///
/// Defaults to the user. /// Defaults to the user.
@ -140,18 +152,36 @@ impl Config {
self self
} }
/// Gets the name of the database to connect to, if one has been configured
/// with the `dbname` method.
pub fn get_dbname(&self) -> Option<&str> {
self.config.get_dbname()
}
/// Sets command line options used to configure the server. /// Sets command line options used to configure the server.
pub fn options(&mut self, options: &str) -> &mut Config { pub fn options(&mut self, options: &str) -> &mut Config {
self.config.options(options); self.config.options(options);
self self
} }
/// Gets the command line options used to configure the server, if the
/// options have been set with the `options` method.
pub fn get_options(&self) -> Option<&str> {
self.config.get_options()
}
/// Sets the value of the `application_name` runtime parameter. /// Sets the value of the `application_name` runtime parameter.
pub fn application_name(&mut self, application_name: &str) -> &mut Config { pub fn application_name(&mut self, application_name: &str) -> &mut Config {
self.config.application_name(application_name); self.config.application_name(application_name);
self self
} }
/// Gets the value of the `application_name` runtime parameter, if it has
/// been set with the `application_name` method.
pub fn get_application_name(&self) -> Option<&str> {
self.config.get_application_name()
}
/// Sets the SSL configuration. /// Sets the SSL configuration.
/// ///
/// Defaults to `prefer`. /// Defaults to `prefer`.
@ -160,6 +190,11 @@ impl Config {
self self
} }
/// Gets the SSL configuration.
pub fn get_ssl_mode(&self) -> SslMode {
self.config.get_ssl_mode()
}
/// Adds a host to the configuration. /// Adds a host to the configuration.
/// ///
/// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix /// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix
@ -169,6 +204,11 @@ impl Config {
self self
} }
/// Gets the hosts that have been added to the configuration with `host`.
pub fn get_hosts(&self) -> &[Host] {
self.config.get_hosts()
}
/// Adds a Unix socket host to the configuration. /// Adds a Unix socket host to the configuration.
/// ///
/// Unlike `host`, this method allows non-UTF8 paths. /// Unlike `host`, this method allows non-UTF8 paths.
@ -191,6 +231,11 @@ impl Config {
self self
} }
/// Gets the ports that have been added to the configuration with `port`.
pub fn get_ports(&self) -> &[u16] {
self.config.get_ports()
}
/// Sets the timeout applied to socket-level connection attempts. /// Sets the timeout applied to socket-level connection attempts.
/// ///
/// Note that hostnames can resolve to multiple IP addresses, and this timeout will apply to each address of each /// Note that hostnames can resolve to multiple IP addresses, and this timeout will apply to each address of each
@ -200,6 +245,12 @@ impl Config {
self self
} }
/// Gets the connection timeout, if one has been set with the
/// `connect_timeout` method.
pub fn get_connect_timeout(&self) -> Option<&Duration> {
self.config.get_connect_timeout()
}
/// Controls the use of TCP keepalive. /// Controls the use of TCP keepalive.
/// ///
/// This is ignored for Unix domain socket connections. Defaults to `true`. /// This is ignored for Unix domain socket connections. Defaults to `true`.
@ -208,6 +259,11 @@ impl Config {
self self
} }
/// Reports whether TCP keepalives will be used.
pub fn get_keepalives(&self) -> bool {
self.config.get_keepalives()
}
/// Sets the amount of idle time before a keepalive packet is sent on the connection. /// Sets the amount of idle time before a keepalive packet is sent on the connection.
/// ///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours. /// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours.
@ -216,6 +272,12 @@ impl Config {
self self
} }
/// Gets the configured amount of idle time before a keepalive packet will
/// be sent on the connection.
pub fn get_keepalives_idle(&self) -> Duration {
self.config.get_keepalives_idle()
}
/// Sets the requirements of the session. /// Sets the requirements of the session.
/// ///
/// This can be used to connect to the primary server in a clustered database rather than one of the read-only /// This can be used to connect to the primary server in a clustered database rather than one of the read-only
@ -228,6 +290,11 @@ impl Config {
self self
} }
/// Gets the requirements of the session.
pub fn get_target_session_attrs(&self) -> TargetSessionAttrs {
self.config.get_target_session_attrs()
}
/// Sets the channel binding behavior. /// Sets the channel binding behavior.
/// ///
/// Defaults to `prefer`. /// Defaults to `prefer`.
@ -236,6 +303,11 @@ impl Config {
self self
} }
/// Gets the channel binding behavior.
pub fn get_channel_binding(&self) -> ChannelBinding {
self.config.get_channel_binding()
}
/// Opens a connection to a PostgreSQL database. /// Opens a connection to a PostgreSQL database.
pub fn connect<T>(&self, tls: T) -> Result<Client, Error> pub fn connect<T>(&self, tls: T) -> Result<Client, Error>
where where

View File

@ -8,7 +8,7 @@
//! # fn main() -> Result<(), postgres::Error> { //! # fn main() -> Result<(), postgres::Error> {
//! let mut client = Client::connect("host=localhost user=postgres", NoTls)?; //! let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
//! //!
//! client.simple_query(" //! client.batch_execute("
//! CREATE TABLE person ( //! CREATE TABLE person (
//! id SERIAL PRIMARY KEY, //! id SERIAL PRIMARY KEY,
//! name TEXT NOT NULL, //! name TEXT NOT NULL,
@ -54,6 +54,7 @@ pub use tokio_postgres::{
error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement, error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement,
}; };
pub use crate::cancel_token::CancelToken;
pub use crate::client::*; pub use crate::client::*;
pub use crate::config::Config; pub use crate::config::Config;
pub use crate::copy_in_writer::CopyInWriter; pub use crate::copy_in_writer::CopyInWriter;
@ -69,6 +70,7 @@ pub use crate::tls::NoTls;
pub use crate::transaction::*; pub use crate::transaction::*;
pub mod binary_copy; pub mod binary_copy;
mod cancel_token;
mod client; mod client;
pub mod config; pub mod config;
mod copy_in_writer; mod copy_in_writer;

View File

@ -1,4 +1,7 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::thread;
use std::time::Duration;
use tokio_postgres::error::SqlState;
use tokio_postgres::types::Type; use tokio_postgres::types::Type;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
@ -288,3 +291,21 @@ fn portal() {
assert_eq!(rows.len(), 1); assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get::<_, i32>(0), 3); assert_eq!(rows[0].get::<_, i32>(0), 3);
} }
#[test]
fn cancel_query() {
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
let cancel_token = client.cancel_token();
let cancel_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
cancel_token.cancel_query(NoTls).unwrap();
});
match client.batch_execute("SELECT pg_sleep(100)") {
Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {}
t => panic!("unexpected return: {:?}", t),
}
cancel_thread.join().unwrap();
}

View File

@ -1,5 +1,6 @@
use crate::{ use crate::{
CopyInWriter, CopyOutReader, GenericConnection, Portal, RowIter, Rt, Statement, ToStatement, CancelToken, CopyInWriter, CopyOutReader, GenericConnection, Portal, RowIter, Rt, Statement,
ToStatement,
}; };
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::types::{ToSql, Type};
@ -170,6 +171,11 @@ impl<'a> Transaction<'a> {
self.runtime.block_on(self.transaction.batch_execute(query)) self.runtime.block_on(self.transaction.batch_execute(query))
} }
/// Like `Client::cancel_token`.
pub fn cancel_token(&self) -> CancelToken {
CancelToken::new(self.transaction.cancel_token())
}
/// Like `Client::transaction`. /// Like `Client::transaction`.
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> { pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
let transaction = self.runtime.block_on(self.transaction.transaction())?; let transaction = self.runtime.block_on(self.transaction.transaction())?;

View File

@ -1,5 +1,31 @@
# Change Log # Change Log
## v0.5.1 - 2019-12-25
### Fixed
* Removed some stray `println!`s from `copy_out` internals.
## v0.5.0 - 2019-12-23
### Changed
* `Client::copy_in` now returns a `Sink` rather than taking in a `Stream`.
* `CopyStream` has been renamed to `CopyOutStream`.
* `Client::copy_in` and `Client::copy_out` no longer take query parameters as PostgreSQL doesn't support parameters in
COPY queries.
* `TargetSessionAttrs`, `SslMode`, and `ChannelBinding` are now true non-exhaustive enums.
### Added
* Added `Client::query_opt` for queries expected to return zero or one rows.
* Added binary copy format support to the `binary_copy` module.
* Added back query logging.
### Removed
* Removed `uuid` 0.7 support.
## v0.5.0-alpha.2 - 2019-11-27 ## v0.5.0-alpha.2 - 2019-11-27
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "tokio-postgres" name = "tokio-postgres"
version = "0.5.0-alpha.2" version = "0.5.1"
authors = ["Steven Fackler <sfackler@gmail.com>"] authors = ["Steven Fackler <sfackler@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -44,8 +44,8 @@ parking_lot = "0.10"
percent-encoding = "2.0" percent-encoding = "2.0"
pin-project-lite = "0.1" pin-project-lite = "0.1"
phf = "0.8" phf = "0.8"
postgres-protocol = { version = "=0.5.0-alpha.2", path = "../postgres-protocol" } postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "=0.1.0-alpha.2", path = "../postgres-types" } postgres-types = { version = "0.1.0", path = "../postgres-types" }
tokio = { version = "0.2", features = ["io-util"] } tokio = { version = "0.2", features = ["io-util"] }
tokio-util = { version = "0.2", features = ["codec"] } tokio-util = { version = "0.2", features = ["codec"] }
@ -60,6 +60,5 @@ eui48-04 = { version = "0.4", package = "eui48" }
geo-types-04 = { version = "0.4", package = "geo-types" } geo-types-04 = { version = "0.4", package = "geo-types" }
serde-1 = { version = "1.0", package = "serde" } serde-1 = { version = "1.0", package = "serde" }
serde_json-1 = { version = "1.0", package = "serde_json" } serde_json-1 = { version = "1.0", package = "serde_json" }
uuid-07 = { version = "0.7", package = "uuid" }
uuid-08 = { version = "0.8", package = "uuid" } uuid-08 = { version = "0.8", package = "uuid" }

View File

@ -0,0 +1,63 @@
use crate::config::SslMode;
use crate::tls::TlsConnect;
#[cfg(feature = "runtime")]
use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect, Socket};
use crate::{cancel_query_raw, Error};
use tokio::io::{AsyncRead, AsyncWrite};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone)]
pub struct CancelToken {
#[cfg(feature = "runtime")]
pub(crate) socket_config: Option<SocketConfig>,
pub(crate) ssl_mode: SslMode,
pub(crate) process_id: i32,
pub(crate) secret_key: i32,
}
impl CancelToken {
/// Attempts to cancel the in-progress query on the connection associated
/// with this `CancelToken`.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
/// only be returned if the client was unable to connect to the database.
///
/// Cancellation is inherently racy. There is no guarantee that the
/// cancellation request will reach the server before the query terminates
/// normally, or that the connection associated with this token is still
/// active.
///
/// Requires the `runtime` Cargo feature (enabled by default).
#[cfg(feature = "runtime")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where
T: MakeTlsConnect<Socket>,
{
cancel_query::cancel_query(
self.socket_config.clone(),
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
}
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself.
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
cancel_query_raw::cancel_query_raw(
stream,
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
}
}

View File

@ -1,5 +1,3 @@
#[cfg(feature = "runtime")]
use crate::cancel_query;
use crate::codec::BackendMessages; use crate::codec::BackendMessages;
use crate::config::{Host, SslMode}; use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages}; use crate::connection::{Request, RequestMessages};
@ -14,7 +12,7 @@ use crate::to_statement::ToStatement;
use crate::types::{Oid, ToSql, Type}; use crate::types::{Oid, ToSql, Type};
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use crate::Socket; use crate::Socket;
use crate::{cancel_query_raw, copy_in, copy_out, query, CopyInSink, Transaction}; use crate::{copy_in, copy_out, query, CancelToken, CopyInSink, Transaction};
use crate::{prepare, SimpleQueryMessage}; use crate::{prepare, SimpleQueryMessage};
use crate::{simple_query, Row}; use crate::{simple_query, Row};
use crate::{Error, Statement}; use crate::{Error, Statement};
@ -451,6 +449,19 @@ impl Client {
Ok(Transaction::new(self)) Ok(Transaction::new(self))
} }
/// Constructs a cancellation token that can later be used to request
/// cancellation of a query running on the connection associated with
/// this client.
pub fn cancel_token(&self) -> CancelToken {
CancelToken {
#[cfg(feature = "runtime")]
socket_config: self.socket_config.clone(),
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
}
}
/// Attempts to cancel an in-progress query. /// Attempts to cancel an in-progress query.
/// ///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will /// The server provides no information about whether a cancellation attempt was successful or not. An error will
@ -458,35 +469,23 @@ impl Client {
/// ///
/// Requires the `runtime` Cargo feature (enabled by default). /// Requires the `runtime` Cargo feature (enabled by default).
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error> pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where where
T: MakeTlsConnect<Socket>, T: MakeTlsConnect<Socket>,
{ {
cancel_query::cancel_query( self.cancel_token().cancel_query(tls).await
self.socket_config.clone(),
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
} }
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself. /// connection itself.
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error> pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>, T: TlsConnect<S>,
{ {
cancel_query_raw::cancel_query_raw( self.cancel_token().cancel_query_raw(stream, tls).await
stream,
self.ssl_mode,
tls,
self.process_id,
self.secret_key,
)
.await
} }
/// Determines if the connection to the server has already closed. /// Determines if the connection to the server has already closed.

View File

@ -24,17 +24,17 @@ use tokio::io::{AsyncRead, AsyncWrite};
/// Properties required of a session. /// Properties required of a session.
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum TargetSessionAttrs { pub enum TargetSessionAttrs {
/// No special properties are required. /// No special properties are required.
Any, Any,
/// The session must allow writes. /// The session must allow writes.
ReadWrite, ReadWrite,
#[doc(hidden)]
__NonExhaustive,
} }
/// TLS configuration. /// TLS configuration.
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum SslMode { pub enum SslMode {
/// Do not use TLS. /// Do not use TLS.
Disable, Disable,
@ -42,12 +42,11 @@ pub enum SslMode {
Prefer, Prefer,
/// Require the use of TLS. /// Require the use of TLS.
Require, Require,
#[doc(hidden)]
__NonExhaustive,
} }
/// Channel binding configuration. /// Channel binding configuration.
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum ChannelBinding { pub enum ChannelBinding {
/// Do not use channel binding. /// Do not use channel binding.
Disable, Disable,
@ -55,13 +54,16 @@ pub enum ChannelBinding {
Prefer, Prefer,
/// Require the use of channel binding. /// Require the use of channel binding.
Require, Require,
#[doc(hidden)]
__NonExhaustive,
} }
/// A host specification.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub(crate) enum Host { pub enum Host {
/// A TCP hostname.
Tcp(String), Tcp(String),
/// A path to a directory containing the server's Unix socket.
///
/// This variant is only available on Unix platforms.
#[cfg(unix)] #[cfg(unix)]
Unix(PathBuf), Unix(PathBuf),
} }
@ -121,7 +123,7 @@ pub(crate) enum Host {
/// # Url /// # Url
/// ///
/// This format resembles a URL with a scheme of either `postgres://` or `postgresql://`. All components are optional, /// This format resembles a URL with a scheme of either `postgres://` or `postgresql://`. All components are optional,
/// and the format accept query parameters for all of the key-value pairs described in the section above. Multiple /// and the format accepts query parameters for all of the key-value pairs described in the section above. Multiple
/// host/port pairs can be comma-separated. Unix socket paths in the host section of the URL should be percent-encoded, /// host/port pairs can be comma-separated. Unix socket paths in the host section of the URL should be percent-encoded,
/// as the path component of the URL specifies the database name. /// as the path component of the URL specifies the database name.
/// ///
@ -193,6 +195,12 @@ impl Config {
self self
} }
/// Gets the user to authenticate with, if one has been configured with
/// the `user` method.
pub fn get_user(&self) -> Option<&str> {
self.user.as_deref()
}
/// Sets the password to authenticate with. /// Sets the password to authenticate with.
pub fn password<T>(&mut self, password: T) -> &mut Config pub fn password<T>(&mut self, password: T) -> &mut Config
where where
@ -202,6 +210,12 @@ impl Config {
self self
} }
/// Gets the password to authenticate with, if one has been configured with
/// the `password` method.
pub fn get_password(&self) -> Option<&[u8]> {
self.password.as_deref()
}
/// Sets the name of the database to connect to. /// Sets the name of the database to connect to.
/// ///
/// Defaults to the user. /// Defaults to the user.
@ -210,18 +224,36 @@ impl Config {
self self
} }
/// Gets the name of the database to connect to, if one has been configured
/// with the `dbname` method.
pub fn get_dbname(&self) -> Option<&str> {
self.dbname.as_deref()
}
/// Sets command line options used to configure the server. /// Sets command line options used to configure the server.
pub fn options(&mut self, options: &str) -> &mut Config { pub fn options(&mut self, options: &str) -> &mut Config {
self.options = Some(options.to_string()); self.options = Some(options.to_string());
self self
} }
/// Gets the command line options used to configure the server, if the
/// options have been set with the `options` method.
pub fn get_options(&self) -> Option<&str> {
self.options.as_deref()
}
/// Sets the value of the `application_name` runtime parameter. /// Sets the value of the `application_name` runtime parameter.
pub fn application_name(&mut self, application_name: &str) -> &mut Config { pub fn application_name(&mut self, application_name: &str) -> &mut Config {
self.application_name = Some(application_name.to_string()); self.application_name = Some(application_name.to_string());
self self
} }
/// Gets the value of the `application_name` runtime parameter, if it has
/// been set with the `application_name` method.
pub fn get_application_name(&self) -> Option<&str> {
self.application_name.as_deref()
}
/// Sets the SSL configuration. /// Sets the SSL configuration.
/// ///
/// Defaults to `prefer`. /// Defaults to `prefer`.
@ -230,6 +262,11 @@ impl Config {
self self
} }
/// Gets the SSL configuration.
pub fn get_ssl_mode(&self) -> SslMode {
self.ssl_mode
}
/// Adds a host to the configuration. /// Adds a host to the configuration.
/// ///
/// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix /// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix
@ -246,6 +283,11 @@ impl Config {
self self
} }
/// Gets the hosts that have been added to the configuration with `host`.
pub fn get_hosts(&self) -> &[Host] {
&self.host
}
/// Adds a Unix socket host to the configuration. /// Adds a Unix socket host to the configuration.
/// ///
/// Unlike `host`, this method allows non-UTF8 paths. /// Unlike `host`, this method allows non-UTF8 paths.
@ -268,6 +310,11 @@ impl Config {
self self
} }
/// Gets the ports that have been added to the configuration with `port`.
pub fn get_ports(&self) -> &[u16] {
&self.port
}
/// Sets the timeout applied to socket-level connection attempts. /// Sets the timeout applied to socket-level connection attempts.
/// ///
/// Note that hostnames can resolve to multiple IP addresses, and this timeout will apply to each address of each /// Note that hostnames can resolve to multiple IP addresses, and this timeout will apply to each address of each
@ -277,6 +324,12 @@ impl Config {
self self
} }
/// Gets the connection timeout, if one has been set with the
/// `connect_timeout` method.
pub fn get_connect_timeout(&self) -> Option<&Duration> {
self.connect_timeout.as_ref()
}
/// Controls the use of TCP keepalive. /// Controls the use of TCP keepalive.
/// ///
/// This is ignored for Unix domain socket connections. Defaults to `true`. /// This is ignored for Unix domain socket connections. Defaults to `true`.
@ -285,6 +338,11 @@ impl Config {
self self
} }
/// Reports whether TCP keepalives will be used.
pub fn get_keepalives(&self) -> bool {
self.keepalives
}
/// Sets the amount of idle time before a keepalive packet is sent on the connection. /// Sets the amount of idle time before a keepalive packet is sent on the connection.
/// ///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours. /// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours.
@ -293,6 +351,12 @@ impl Config {
self self
} }
/// Gets the configured amount of idle time before a keepalive packet will
/// be sent on the connection.
pub fn get_keepalives_idle(&self) -> Duration {
self.keepalives_idle
}
/// Sets the requirements of the session. /// Sets the requirements of the session.
/// ///
/// This can be used to connect to the primary server in a clustered database rather than one of the read-only /// This can be used to connect to the primary server in a clustered database rather than one of the read-only
@ -305,6 +369,11 @@ impl Config {
self self
} }
/// Gets the requirements of the session.
pub fn get_target_session_attrs(&self) -> TargetSessionAttrs {
self.target_session_attrs
}
/// Sets the channel binding behavior. /// Sets the channel binding behavior.
/// ///
/// Defaults to `prefer`. /// Defaults to `prefer`.
@ -313,6 +382,11 @@ impl Config {
self self
} }
/// Gets the channel binding behavior.
pub fn get_channel_binding(&self) -> ChannelBinding {
self.channel_binding
}
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> { fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key { match key {
"user" => { "user" => {
@ -425,7 +499,7 @@ impl Config {
/// Connects to a PostgreSQL database over an arbitrary stream. /// Connects to a PostgreSQL database over an arbitrary stream.
/// ///
/// All of the settings other than `user`, `password`, `dbname`, `options`, and `application` name are ignored. /// All of the settings other than `user`, `password`, `dbname`, `options`, and `application_name` name are ignored.
pub async fn connect_raw<S, T>( pub async fn connect_raw<S, T>(
&self, &self,
stream: S, stream: S,

View File

@ -197,7 +197,6 @@ fn can_skip_channel_binding(config: &Config) -> Result<(), Error> {
config::ChannelBinding::Require => Err(Error::authentication( config::ChannelBinding::Require => Err(Error::authentication(
"server did not use channel binding".into(), "server did not use channel binding".into(),
)), )),
config::ChannelBinding::__NonExhaustive => unreachable!(),
} }
} }

View File

@ -22,7 +22,6 @@ where
return Ok(MaybeTlsStream::Raw(stream)) return Ok(MaybeTlsStream::Raw(stream))
} }
SslMode::Prefer | SslMode::Require => {} SslMode::Prefer | SslMode::Require => {}
SslMode::__NonExhaustive => unreachable!(),
} }
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();

View File

@ -25,13 +25,11 @@ pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<Copy
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> { async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
println!("a");
match responses.next().await? { match responses.next().await? {
Message::BindComplete => {} Message::BindComplete => {}
_ => return Err(Error::unexpected_message()), _ => return Err(Error::unexpected_message()),
} }
println!("b");
match responses.next().await? { match responses.next().await? {
Message::CopyOutResponse(_) => {} Message::CopyOutResponse(_) => {}
_ => return Err(Error::unexpected_message()), _ => return Err(Error::unexpected_message()),
@ -55,7 +53,6 @@ impl Stream for CopyOutStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project(); let this = self.project();
println!("c");
match ready!(this.responses.poll_next(cx)?) { match ready!(this.responses.poll_next(cx)?) {
Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))), Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
Message::CopyDone => Poll::Ready(None), Message::CopyDone => Poll::Ready(None),

View File

@ -21,15 +21,12 @@
//! } //! }
//! }); //! });
//! //!
//! // Now we can prepare a simple statement that just returns its parameter. //! // Now we can execute a simple statement that just returns its parameter.
//! let stmt = client.prepare("SELECT $1::TEXT").await?;
//!
//! // And then execute it, returning a list of the resulting rows.
//! let rows = client //! let rows = client
//! .query(&stmt, &[&"hello world"]) //! .query("SELECT $1::TEXT", &[&"hello world"])
//! .await?; //! .await?;
//! //!
//! // Now we can check that we got back the same string we sent over. //! // And then check that we got back the same string we sent over.
//! let value: &str = rows[0].get(0); //! let value: &str = rows[0].get(0);
//! assert_eq!(value, "hello world"); //! assert_eq!(value, "hello world");
//! //!
@ -102,6 +99,7 @@
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.5")] #![doc(html_root_url = "https://docs.rs/tokio-postgres/0.5")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)] #![warn(rust_2018_idioms, clippy::all, missing_docs)]
pub use crate::cancel_token::CancelToken;
pub use crate::client::Client; pub use crate::client::Client;
pub use crate::config::Config; pub use crate::config::Config;
pub use crate::connection::Connection; pub use crate::connection::Connection;
@ -128,6 +126,7 @@ mod bind;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
mod cancel_query; mod cancel_query;
mod cancel_query_raw; mod cancel_query_raw;
mod cancel_token;
mod client; mod client;
mod codec; mod codec;
pub mod config; pub mod config;
@ -201,6 +200,7 @@ impl Notification {
/// An asynchronous message from the server. /// An asynchronous message from the server.
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum AsyncMessage { pub enum AsyncMessage {
/// A notice. /// A notice.
/// ///
@ -210,11 +210,10 @@ pub enum AsyncMessage {
/// ///
/// Connections can subscribe to notifications with the `LISTEN` command. /// Connections can subscribe to notifications with the `LISTEN` command.
Notification(Notification), Notification(Notification),
#[doc(hidden)]
__NonExhaustive,
} }
/// Message returned by the `SimpleQuery` stream. /// Message returned by the `SimpleQuery` stream.
#[non_exhaustive]
pub enum SimpleQueryMessage { pub enum SimpleQueryMessage {
/// A row of data. /// A row of data.
Row(SimpleQueryRow), Row(SimpleQueryRow),
@ -222,8 +221,6 @@ pub enum SimpleQueryMessage {
/// ///
/// The number of rows modified or selected is returned. /// The number of rows modified or selected is returned.
CommandComplete(u64), CommandComplete(u64),
#[doc(hidden)]
__NonExhaustive,
} }
fn slice_iter<'a>( fn slice_iter<'a>(

View File

@ -9,8 +9,8 @@ use crate::types::{ToSql, Type};
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use crate::Socket; use crate::Socket;
use crate::{ use crate::{
bind, query, slice_iter, Client, CopyInSink, Error, Portal, Row, SimpleQueryMessage, Statement, bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
ToStatement, SimpleQueryMessage, Statement, ToStatement,
}; };
use bytes::Buf; use bytes::Buf;
use futures::TryStreamExt; use futures::TryStreamExt;
@ -155,11 +155,7 @@ impl<'a> Transaction<'a> {
} }
/// Like `Client::execute_iter`. /// Like `Client::execute_iter`.
pub async fn execute_raw<'b, I, T>( pub async fn execute_raw<'b, I, T>(&self, statement: &T, params: I) -> Result<u64, Error>
&self,
statement: &Statement,
params: I,
) -> Result<u64, Error>
where where
T: ?Sized + ToStatement, T: ?Sized + ToStatement,
I: IntoIterator<Item = &'b dyn ToSql>, I: IntoIterator<Item = &'b dyn ToSql>,
@ -184,13 +180,13 @@ impl<'a> Transaction<'a> {
where where
T: ?Sized + ToStatement, T: ?Sized + ToStatement,
{ {
self.bind_iter(statement, slice_iter(params)).await self.bind_raw(statement, slice_iter(params)).await
} }
/// Like [`bind`], but takes an iterator of parameters rather than a slice. /// A maximally flexible version of [`bind`].
/// ///
/// [`bind`]: #method.bind /// [`bind`]: #method.bind
pub async fn bind_iter<'b, T, I>(&self, statement: &T, params: I) -> Result<Portal, Error> pub async fn bind_raw<'b, T, I>(&self, statement: &T, params: I) -> Result<Portal, Error>
where where
T: ?Sized + ToStatement, T: ?Sized + ToStatement,
I: IntoIterator<Item = &'b dyn ToSql>, I: IntoIterator<Item = &'b dyn ToSql>,
@ -211,7 +207,9 @@ impl<'a> Transaction<'a> {
.await .await
} }
/// The maximally flexible version of `query_portal`. /// The maximally flexible version of [`query_portal`].
///
/// [`query_portal`]: #method.query_portal
pub async fn query_portal_raw( pub async fn query_portal_raw(
&self, &self,
portal: &Portal, portal: &Portal,
@ -247,21 +245,30 @@ impl<'a> Transaction<'a> {
self.client.batch_execute(query).await self.client.batch_execute(query).await
} }
/// Like `Client::cancel_token`.
pub fn cancel_token(&self) -> CancelToken {
self.client.cancel_token()
}
/// Like `Client::cancel_query`. /// Like `Client::cancel_query`.
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error> pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
where where
T: MakeTlsConnect<Socket>, T: MakeTlsConnect<Socket>,
{ {
#[allow(deprecated)]
self.client.cancel_query(tls).await self.client.cancel_query(tls).await
} }
/// Like `Client::cancel_query_raw`. /// Like `Client::cancel_query_raw`.
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error> pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>, T: TlsConnect<S>,
{ {
#[allow(deprecated)]
self.client.cancel_query_raw(stream, tls).await self.client.cancel_query_raw(stream, tls).await
} }

View File

@ -304,7 +304,8 @@ async fn cancel_query_raw() {
let client = connect("user=postgres").await; let client = connect("user=postgres").await;
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap(); let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
let cancel = client.cancel_query_raw(socket, NoTls); let cancel_token = client.cancel_token();
let cancel = cancel_token.cancel_query_raw(socket, NoTls);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel); let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)"); let sleep = client.batch_execute("SELECT pg_sleep(100)");

View File

@ -70,7 +70,8 @@ async fn target_session_attrs_err() {
async fn cancel_query() { async fn cancel_query() {
let client = connect("host=localhost port=5433 user=postgres").await; let client = connect("host=localhost port=5433 user=postgres").await;
let cancel = client.cancel_query(NoTls); let cancel_token = client.cancel_token();
let cancel = cancel_token.cancel_query(NoTls);
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel); let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
let sleep = client.batch_execute("SELECT pg_sleep(100)"); let sleep = client.batch_execute("SELECT pg_sleep(100)");