Support keepalive interval and retries.

This commit is contained in:
Hirotaka Azuma 2022-08-26 05:09:00 +00:00
parent 54331183ea
commit a78ce35d44
9 changed files with 189 additions and 20 deletions

View File

@ -48,6 +48,10 @@ use tokio_postgres::{Error, Socket};
/// This option is ignored when connecting with Unix sockets. Defaults to on. /// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server. /// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours. /// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets. Available on neither Redox nor Solaris.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets. Available on neither Redox, Solaris nor Windows.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that /// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server /// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`. /// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
@ -279,6 +283,45 @@ impl Config {
self.config.get_keepalives_idle() self.config.get_keepalives_idle()
} }
/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive structs keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
///
/// Available on neither Redox nor Solaris.
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.config.keepalives_interval(keepalives_interval);
self
}
/// Gets the time interval between TCP keepalive probes.
///
/// Available on neither Redox nor Solaris.
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
pub fn get_keepalives_interval(&self) -> Option<&Duration> {
self.config.get_keepalives_interval()
}
/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
///
/// Available on neither Redox, Solaris nor Windows.
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.config.keepalives_retries(keepalives_retries);
self
}
/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// Available on neither Redox, Solaris nor Windows.
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
pub fn get_keepalives_retries(&self) -> Option<&u32> {
self.config.get_keepalives_retries()
}
/// Sets the requirements of the session. /// Sets the requirements of the session.
/// ///
/// This can be used to connect to the primary server in a clustered database rather than one of the read-only /// This can be used to connect to the primary server in a clustered database rather than one of the read-only

View File

@ -38,8 +38,7 @@ where
&config.host, &config.host,
config.port, config.port,
config.connect_timeout, config.connect_timeout,
config.keepalives, config.keepalive.as_ref(),
config.keepalives_idle,
) )
.await?; .await?;

View File

@ -4,6 +4,7 @@ use crate::config::Host;
use crate::config::SslMode; use crate::config::SslMode;
use crate::connection::{Request, RequestMessages}; use crate::connection::{Request, RequestMessages};
use crate::copy_out::CopyOutStream; use crate::copy_out::CopyOutStream;
use crate::keepalive::KeepaliveConfig;
use crate::query::RowStream; use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream; use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
@ -154,8 +155,7 @@ pub(crate) struct SocketConfig {
pub host: Host, pub host: Host,
pub port: u16, pub port: u16,
pub connect_timeout: Option<Duration>, pub connect_timeout: Option<Duration>,
pub keepalives: bool, pub keepalive: Option<KeepaliveConfig>,
pub keepalives_idle: Duration,
} }
/// An asynchronous PostgreSQL client. /// An asynchronous PostgreSQL client.

View File

@ -3,6 +3,7 @@
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use crate::connect::connect; use crate::connect::connect;
use crate::connect_raw::connect_raw; use crate::connect_raw::connect_raw;
use crate::keepalive::KeepaliveConfig;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect; use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect; use crate::tls::TlsConnect;
@ -99,6 +100,10 @@ pub enum Host {
/// This option is ignored when connecting with Unix sockets. Defaults to on. /// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server. /// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours. /// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets. Available on neither Redox nor Solaris.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets. Available on neither Redox, Solaris nor Windows.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that /// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server /// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`. /// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
@ -156,7 +161,7 @@ pub struct Config {
pub(crate) port: Vec<u16>, pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>, pub(crate) connect_timeout: Option<Duration>,
pub(crate) keepalives: bool, pub(crate) keepalives: bool,
pub(crate) keepalives_idle: Duration, pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs, pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding, pub(crate) channel_binding: ChannelBinding,
} }
@ -170,6 +175,13 @@ impl Default for Config {
impl Config { impl Config {
/// Creates a new configuration. /// Creates a new configuration.
pub fn new() -> Config { pub fn new() -> Config {
let keepalive_config = KeepaliveConfig {
idle: Duration::from_secs(2 * 60 * 60),
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
interval: None,
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
retries: None,
};
Config { Config {
user: None, user: None,
password: None, password: None,
@ -181,7 +193,7 @@ impl Config {
port: vec![], port: vec![],
connect_timeout: None, connect_timeout: None,
keepalives: true, keepalives: true,
keepalives_idle: Duration::from_secs(2 * 60 * 60), keepalive_config,
target_session_attrs: TargetSessionAttrs::Any, target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer, channel_binding: ChannelBinding::Prefer,
} }
@ -347,14 +359,53 @@ impl Config {
/// ///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours. /// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours.
pub fn keepalives_idle(&mut self, keepalives_idle: Duration) -> &mut Config { pub fn keepalives_idle(&mut self, keepalives_idle: Duration) -> &mut Config {
self.keepalives_idle = keepalives_idle; self.keepalive_config.idle = keepalives_idle;
self self
} }
/// Gets the configured amount of idle time before a keepalive packet will /// Gets the configured amount of idle time before a keepalive packet will
/// be sent on the connection. /// be sent on the connection.
pub fn get_keepalives_idle(&self) -> Duration { pub fn get_keepalives_idle(&self) -> Duration {
self.keepalives_idle self.keepalive_config.idle
}
/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive structs keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
///
/// Available on neither Redox nor Solaris.
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.keepalive_config.interval = Some(keepalives_interval);
self
}
/// Gets the time interval between TCP keepalive probes.
///
/// Available on neither Redox nor Solaris.
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
pub fn get_keepalives_interval(&self) -> Option<&Duration> {
self.keepalive_config.interval.as_ref()
}
/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
///
/// Available on neither Redox, Solaris nor Windows.
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.keepalive_config.retries = Some(keepalives_retries);
self
}
/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// Available on neither Redox, Solaris nor Windows.
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
pub fn get_keepalives_retries(&self) -> Option<&u32> {
self.keepalive_config.retries.as_ref()
} }
/// Sets the requirements of the session. /// Sets the requirements of the session.
@ -451,6 +502,22 @@ impl Config {
self.keepalives_idle(Duration::from_secs(keepalives_idle as u64)); self.keepalives_idle(Duration::from_secs(keepalives_idle as u64));
} }
} }
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
"keepalives_interval" => {
let keepalives_interval = value.parse::<i64>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_interval")))
})?;
if keepalives_interval > 0 {
self.keepalives_interval(Duration::from_secs(keepalives_interval as u64));
}
}
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
"keepalives_retries" => {
let keepalives_retries = value.parse::<u32>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_retries")))
})?;
self.keepalives_retries(keepalives_retries);
}
"target_session_attrs" => { "target_session_attrs" => {
let target_session_attrs = match value { let target_session_attrs = match value {
"any" => TargetSessionAttrs::Any, "any" => TargetSessionAttrs::Any,
@ -534,8 +601,8 @@ impl fmt::Debug for Config {
} }
} }
f.debug_struct("Config") let mut ds = f.debug_struct("Config");
.field("user", &self.user) ds.field("user", &self.user)
.field("password", &self.password.as_ref().map(|_| Redaction {})) .field("password", &self.password.as_ref().map(|_| Redaction {}))
.field("dbname", &self.dbname) .field("dbname", &self.dbname)
.field("options", &self.options) .field("options", &self.options)
@ -545,8 +612,19 @@ impl fmt::Debug for Config {
.field("port", &self.port) .field("port", &self.port)
.field("connect_timeout", &self.connect_timeout) .field("connect_timeout", &self.connect_timeout)
.field("keepalives", &self.keepalives) .field("keepalives", &self.keepalives)
.field("keepalives_idle", &self.keepalives_idle) .field("keepalives_idle", &self.keepalive_config.idle);
.field("target_session_attrs", &self.target_session_attrs)
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
{
ds.field("keepalives_interval", &self.keepalive_config.interval);
}
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
{
ds.field("keepalives_retries", &self.keepalive_config.retries);
}
ds.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding) .field("channel_binding", &self.channel_binding)
.finish() .finish()
} }

View File

@ -65,8 +65,11 @@ where
host, host,
port, port,
config.connect_timeout, config.connect_timeout,
config.keepalives, if config.keepalives {
config.keepalives_idle, Some(&config.keepalive_config)
} else {
None
},
) )
.await?; .await?;
let (mut client, mut connection) = connect_raw(socket, tls, config).await?; let (mut client, mut connection) = connect_raw(socket, tls, config).await?;
@ -115,8 +118,11 @@ where
host: host.clone(), host: host.clone(),
port, port,
connect_timeout: config.connect_timeout, connect_timeout: config.connect_timeout,
keepalives: config.keepalives, keepalive: if config.keepalives {
keepalives_idle: config.keepalives_idle, Some(config.keepalive_config.clone())
} else {
None
},
}); });
Ok((client, connection)) Ok((client, connection))

View File

@ -1,4 +1,5 @@
use crate::config::Host; use crate::config::Host;
use crate::keepalive::KeepaliveConfig;
use crate::{Error, Socket}; use crate::{Error, Socket};
use socket2::{SockRef, TcpKeepalive}; use socket2::{SockRef, TcpKeepalive};
use std::future::Future; use std::future::Future;
@ -13,8 +14,7 @@ pub(crate) async fn connect_socket(
host: &Host, host: &Host,
port: u16, port: u16,
connect_timeout: Option<Duration>, connect_timeout: Option<Duration>,
keepalives: bool, keepalive_config: Option<&KeepaliveConfig>,
keepalives_idle: Duration,
) -> Result<Socket, Error> { ) -> Result<Socket, Error> {
match host { match host {
Host::Tcp(host) => { Host::Tcp(host) => {
@ -35,9 +35,9 @@ pub(crate) async fn connect_socket(
}; };
stream.set_nodelay(true).map_err(Error::connect)?; stream.set_nodelay(true).map_err(Error::connect)?;
if keepalives { if let Some(keepalive_config) = keepalive_config {
SockRef::from(&stream) SockRef::from(&stream)
.set_tcp_keepalive(&TcpKeepalive::new().with_time(keepalives_idle)) .set_tcp_keepalive(&TcpKeepalive::from(keepalive_config))
.map_err(Error::connect)?; .map_err(Error::connect)?;
} }

View File

@ -0,0 +1,29 @@
use socket2::TcpKeepalive;
use std::time::Duration;
#[derive(Clone, PartialEq, Eq)]
pub(crate) struct KeepaliveConfig {
pub idle: Duration,
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
pub interval: Option<Duration>,
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
pub retries: Option<u32>,
}
impl From<&KeepaliveConfig> for TcpKeepalive {
fn from(keepalive_config: &KeepaliveConfig) -> Self {
let mut tcp_keepalive = Self::new().with_time(keepalive_config.idle);
#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
if let Some(interval) = keepalive_config.interval {
tcp_keepalive = tcp_keepalive.with_interval(interval);
}
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
if let Some(retries) = keepalive_config.retries {
tcp_keepalive = tcp_keepalive.with_retries(retries);
}
tcp_keepalive
}
}

View File

@ -163,6 +163,7 @@ mod copy_in;
mod copy_out; mod copy_out;
pub mod error; pub mod error;
mod generic_client; mod generic_client;
mod keepalive;
mod maybe_tls_stream; mod maybe_tls_stream;
mod portal; mod portal;
mod prepare; mod prepare;

View File

@ -36,6 +36,19 @@ fn settings() {
); );
} }
#[test]
#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
fn keepalive_settings() {
check(
"keepalives=1 keepalives_idle=15 keepalives_interval=5 keepalives_retries=9",
Config::new()
.keepalives(true)
.keepalives_idle(Duration::from_secs(15))
.keepalives_interval(Duration::from_secs(5))
.keepalives_retries(9),
);
}
#[test] #[test]
fn url() { fn url() {
check("postgresql://", &Config::new()); check("postgresql://", &Config::new());