Merge pull request #944 from PG-kura/support-keepalive-interval-and-retries

Support keepalive interval and retries.
This commit is contained in:
Steven Fackler 2022-08-27 12:10:55 -04:00 committed by GitHub
commit 14e11bda5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 147 additions and 17 deletions

View File

@ -48,6 +48,10 @@ use tokio_postgres::{Error, Socket};
/// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
@ -279,6 +283,33 @@ impl Config {
self.config.get_keepalives_idle()
}
/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive structs keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.config.keepalives_interval(keepalives_interval);
self
}
/// Gets the time interval between TCP keepalive probes.
pub fn get_keepalives_interval(&self) -> Option<Duration> {
self.config.get_keepalives_interval()
}
/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.config.keepalives_retries(keepalives_retries);
self
}
/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
pub fn get_keepalives_retries(&self) -> Option<u32> {
self.config.get_keepalives_retries()
}
/// Sets the requirements of the session.
///
/// This can be used to connect to the primary server in a clustered database rather than one of the read-only

View File

@ -38,8 +38,7 @@ where
&config.host,
config.port,
config.connect_timeout,
config.keepalives,
config.keepalives_idle,
config.keepalive.as_ref(),
)
.await?;

View File

@ -4,6 +4,8 @@ use crate::config::Host;
use crate::config::SslMode;
use crate::connection::{Request, RequestMessages};
use crate::copy_out::CopyOutStream;
#[cfg(feature = "runtime")]
use crate::keepalive::KeepaliveConfig;
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")]
@ -154,8 +156,7 @@ pub(crate) struct SocketConfig {
pub host: Host,
pub port: u16,
pub connect_timeout: Option<Duration>,
pub keepalives: bool,
pub keepalives_idle: Duration,
pub keepalive: Option<KeepaliveConfig>,
}
/// An asynchronous PostgreSQL client.

View File

@ -3,6 +3,7 @@
#[cfg(feature = "runtime")]
use crate::connect::connect;
use crate::connect_raw::connect_raw;
use crate::keepalive::KeepaliveConfig;
#[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect;
@ -99,6 +100,10 @@ pub enum Host {
/// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
@ -156,7 +161,7 @@ pub struct Config {
pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) keepalives: bool,
pub(crate) keepalives_idle: Duration,
pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
}
@ -170,6 +175,11 @@ impl Default for Config {
impl Config {
/// Creates a new configuration.
pub fn new() -> Config {
let keepalive_config = KeepaliveConfig {
idle: Duration::from_secs(2 * 60 * 60),
interval: None,
retries: None,
};
Config {
user: None,
password: None,
@ -181,7 +191,7 @@ impl Config {
port: vec![],
connect_timeout: None,
keepalives: true,
keepalives_idle: Duration::from_secs(2 * 60 * 60),
keepalive_config,
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
}
@ -347,14 +357,41 @@ impl Config {
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours.
pub fn keepalives_idle(&mut self, keepalives_idle: Duration) -> &mut Config {
self.keepalives_idle = keepalives_idle;
self.keepalive_config.idle = keepalives_idle;
self
}
/// Gets the configured amount of idle time before a keepalive packet will
/// be sent on the connection.
pub fn get_keepalives_idle(&self) -> Duration {
self.keepalives_idle
self.keepalive_config.idle
}
/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive structs keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.keepalive_config.interval = Some(keepalives_interval);
self
}
/// Gets the time interval between TCP keepalive probes.
pub fn get_keepalives_interval(&self) -> Option<Duration> {
self.keepalive_config.interval
}
/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.keepalive_config.retries = Some(keepalives_retries);
self
}
/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
pub fn get_keepalives_retries(&self) -> Option<u32> {
self.keepalive_config.retries
}
/// Sets the requirements of the session.
@ -451,6 +488,20 @@ impl Config {
self.keepalives_idle(Duration::from_secs(keepalives_idle as u64));
}
}
"keepalives_interval" => {
let keepalives_interval = value.parse::<i64>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_interval")))
})?;
if keepalives_interval > 0 {
self.keepalives_interval(Duration::from_secs(keepalives_interval as u64));
}
}
"keepalives_retries" => {
let keepalives_retries = value.parse::<u32>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_retries")))
})?;
self.keepalives_retries(keepalives_retries);
}
"target_session_attrs" => {
let target_session_attrs = match value {
"any" => TargetSessionAttrs::Any,
@ -545,7 +596,9 @@ impl fmt::Debug for Config {
.field("port", &self.port)
.field("connect_timeout", &self.connect_timeout)
.field("keepalives", &self.keepalives)
.field("keepalives_idle", &self.keepalives_idle)
.field("keepalives_idle", &self.keepalive_config.idle)
.field("keepalives_interval", &self.keepalive_config.interval)
.field("keepalives_retries", &self.keepalive_config.retries)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.finish()

View File

@ -65,8 +65,11 @@ where
host,
port,
config.connect_timeout,
config.keepalives,
config.keepalives_idle,
if config.keepalives {
Some(&config.keepalive_config)
} else {
None
},
)
.await?;
let (mut client, mut connection) = connect_raw(socket, tls, config).await?;
@ -115,8 +118,11 @@ where
host: host.clone(),
port,
connect_timeout: config.connect_timeout,
keepalives: config.keepalives,
keepalives_idle: config.keepalives_idle,
keepalive: if config.keepalives {
Some(config.keepalive_config.clone())
} else {
None
},
});
Ok((client, connection))

View File

@ -1,4 +1,5 @@
use crate::config::Host;
use crate::keepalive::KeepaliveConfig;
use crate::{Error, Socket};
use socket2::{SockRef, TcpKeepalive};
use std::future::Future;
@ -13,8 +14,7 @@ pub(crate) async fn connect_socket(
host: &Host,
port: u16,
connect_timeout: Option<Duration>,
keepalives: bool,
keepalives_idle: Duration,
keepalive_config: Option<&KeepaliveConfig>,
) -> Result<Socket, Error> {
match host {
Host::Tcp(host) => {
@ -35,9 +35,9 @@ pub(crate) async fn connect_socket(
};
stream.set_nodelay(true).map_err(Error::connect)?;
if keepalives {
if let Some(keepalive_config) = keepalive_config {
SockRef::from(&stream)
.set_tcp_keepalive(&TcpKeepalive::new().with_time(keepalives_idle))
.set_tcp_keepalive(&TcpKeepalive::from(keepalive_config))
.map_err(Error::connect)?;
}

View File

@ -0,0 +1,27 @@
use socket2::TcpKeepalive;
use std::time::Duration;
#[derive(Clone, PartialEq, Eq)]
pub(crate) struct KeepaliveConfig {
pub idle: Duration,
pub interval: Option<Duration>,
pub retries: Option<u32>,
}
impl From<&KeepaliveConfig> for TcpKeepalive {
fn from(keepalive_config: &KeepaliveConfig) -> Self {
let mut tcp_keepalive = Self::new().with_time(keepalive_config.idle);
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
if let Some(interval) = keepalive_config.interval {
tcp_keepalive = tcp_keepalive.with_interval(interval);
}
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
if let Some(retries) = keepalive_config.retries {
tcp_keepalive = tcp_keepalive.with_retries(retries);
}
tcp_keepalive
}
}

View File

@ -163,6 +163,7 @@ mod copy_in;
mod copy_out;
pub mod error;
mod generic_client;
mod keepalive;
mod maybe_tls_stream;
mod portal;
mod prepare;

View File

@ -36,6 +36,18 @@ fn settings() {
);
}
#[test]
fn keepalive_settings() {
check(
"keepalives=1 keepalives_idle=15 keepalives_interval=5 keepalives_retries=9",
Config::new()
.keepalives(true)
.keepalives_idle(Duration::from_secs(15))
.keepalives_interval(Duration::from_secs(5))
.keepalives_retries(9),
);
}
#[test]
fn url() {
check("postgresql://", &Config::new());