Merge pull request #1007 from pH14/add-tcp-user-timeout

wire through knob for TCP user timeout
This commit is contained in:
Steven Fackler 2023-03-27 16:56:04 -04:00 committed by GitHub
commit c85b2c1dbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 2 deletions

View File

@ -55,7 +55,7 @@ pin-project-lite = "0.2"
phf = "0.11" phf = "0.11"
postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" } postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" }
postgres-types = { version = "0.2.4", path = "../postgres-types" } postgres-types = { version = "0.2.4", path = "../postgres-types" }
socket2 = "0.4" socket2 = { version = "0.4", features = ["all"] }
tokio = { version = "1.0", features = ["io-util"] } tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.7", features = ["codec"] } tokio-util = { version = "0.7", features = ["codec"] }

View File

@ -38,6 +38,7 @@ where
&config.host, &config.host,
config.port, config.port,
config.connect_timeout, config.connect_timeout,
config.tcp_user_timeout,
config.keepalive.as_ref(), config.keepalive.as_ref(),
) )
.await?; .await?;

View File

@ -156,6 +156,7 @@ pub(crate) struct SocketConfig {
pub host: Host, pub host: Host,
pub port: u16, pub port: u16,
pub connect_timeout: Option<Duration>, pub connect_timeout: Option<Duration>,
pub tcp_user_timeout: Option<Duration>,
pub keepalive: Option<KeepaliveConfig>, pub keepalive: Option<KeepaliveConfig>,
} }

View File

@ -96,6 +96,9 @@ pub enum Host {
/// omitted or the empty string. /// omitted or the empty string.
/// * `connect_timeout` - The time limit in seconds applied to each socket-level connection attempt. Note that hostnames /// * `connect_timeout` - The time limit in seconds applied to each socket-level connection attempt. Note that hostnames
/// can resolve to multiple IP addresses, and this limit is applied to each address. Defaults to no timeout. /// can resolve to multiple IP addresses, and this limit is applied to each address. Defaults to no timeout.
/// * `tcp_user_timeout` - The time limit that transmitted data may remain unacknowledged before a connection is forcibly closed.
/// This is ignored for Unix domain socket connections. It is only supported on systems where TCP_USER_TIMEOUT is available
/// and will default to the system default if omitted or set to 0; on other systems, it has no effect.
/// * `keepalives` - Controls the use of TCP keepalive. A value of 0 disables keepalive and nonzero integers enable it. /// * `keepalives` - Controls the use of TCP keepalive. A value of 0 disables keepalive and nonzero integers enable it.
/// This option is ignored when connecting with Unix sockets. Defaults to on. /// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server. /// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
@ -160,6 +163,7 @@ pub struct Config {
pub(crate) host: Vec<Host>, pub(crate) host: Vec<Host>,
pub(crate) port: Vec<u16>, pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>, pub(crate) connect_timeout: Option<Duration>,
pub(crate) tcp_user_timeout: Option<Duration>,
pub(crate) keepalives: bool, pub(crate) keepalives: bool,
pub(crate) keepalive_config: KeepaliveConfig, pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs, pub(crate) target_session_attrs: TargetSessionAttrs,
@ -190,6 +194,7 @@ impl Config {
host: vec![], host: vec![],
port: vec![], port: vec![],
connect_timeout: None, connect_timeout: None,
tcp_user_timeout: None,
keepalives: true, keepalives: true,
keepalive_config, keepalive_config,
target_session_attrs: TargetSessionAttrs::Any, target_session_attrs: TargetSessionAttrs::Any,
@ -340,6 +345,22 @@ impl Config {
self.connect_timeout.as_ref() self.connect_timeout.as_ref()
} }
/// Sets the TCP user timeout.
///
/// This is ignored for Unix domain socket connections. It is only supported on systems where
/// TCP_USER_TIMEOUT is available and will default to the system default if omitted or set to 0;
/// on other systems, it has no effect.
pub fn tcp_user_timeout(&mut self, tcp_user_timeout: Duration) -> &mut Config {
self.tcp_user_timeout = Some(tcp_user_timeout);
self
}
/// Gets the TCP user timeout, if one has been set with the
/// `user_timeout` method.
pub fn get_tcp_user_timeout(&self) -> Option<&Duration> {
self.tcp_user_timeout.as_ref()
}
/// Controls the use of TCP keepalive. /// Controls the use of TCP keepalive.
/// ///
/// This is ignored for Unix domain socket connections. Defaults to `true`. /// This is ignored for Unix domain socket connections. Defaults to `true`.
@ -474,6 +495,14 @@ impl Config {
self.connect_timeout(Duration::from_secs(timeout as u64)); self.connect_timeout(Duration::from_secs(timeout as u64));
} }
} }
"tcp_user_timeout" => {
let timeout = value
.parse::<i64>()
.map_err(|_| Error::config_parse(Box::new(InvalidValue("tcp_user_timeout"))))?;
if timeout > 0 {
self.tcp_user_timeout(Duration::from_secs(timeout as u64));
}
}
"keepalives" => { "keepalives" => {
let keepalives = value let keepalives = value
.parse::<u64>() .parse::<u64>()
@ -595,6 +624,7 @@ impl fmt::Debug for Config {
.field("host", &self.host) .field("host", &self.host)
.field("port", &self.port) .field("port", &self.port)
.field("connect_timeout", &self.connect_timeout) .field("connect_timeout", &self.connect_timeout)
.field("tcp_user_timeout", &self.tcp_user_timeout)
.field("keepalives", &self.keepalives) .field("keepalives", &self.keepalives)
.field("keepalives_idle", &self.keepalive_config.idle) .field("keepalives_idle", &self.keepalive_config.idle)
.field("keepalives_interval", &self.keepalive_config.interval) .field("keepalives_interval", &self.keepalive_config.interval)

View File

@ -65,6 +65,7 @@ where
host, host,
port, port,
config.connect_timeout, config.connect_timeout,
config.tcp_user_timeout,
if config.keepalives { if config.keepalives {
Some(&config.keepalive_config) Some(&config.keepalive_config)
} else { } else {
@ -118,6 +119,7 @@ where
host: host.clone(), host: host.clone(),
port, port,
connect_timeout: config.connect_timeout, connect_timeout: config.connect_timeout,
tcp_user_timeout: config.tcp_user_timeout,
keepalive: if config.keepalives { keepalive: if config.keepalives {
Some(config.keepalive_config.clone()) Some(config.keepalive_config.clone())
} else { } else {

View File

@ -14,6 +14,7 @@ pub(crate) async fn connect_socket(
host: &Host, host: &Host,
port: u16, port: u16,
connect_timeout: Option<Duration>, connect_timeout: Option<Duration>,
tcp_user_timeout: Option<Duration>,
keepalive_config: Option<&KeepaliveConfig>, keepalive_config: Option<&KeepaliveConfig>,
) -> Result<Socket, Error> { ) -> Result<Socket, Error> {
match host { match host {
@ -35,8 +36,17 @@ pub(crate) async fn connect_socket(
}; };
stream.set_nodelay(true).map_err(Error::connect)?; stream.set_nodelay(true).map_err(Error::connect)?;
let sock_ref = SockRef::from(&stream);
#[cfg(target_os = "linux")]
{
sock_ref
.set_tcp_user_timeout(tcp_user_timeout)
.map_err(Error::connect)?;
}
if let Some(keepalive_config) = keepalive_config { if let Some(keepalive_config) = keepalive_config {
SockRef::from(&stream) sock_ref
.set_tcp_keepalive(&TcpKeepalive::from(keepalive_config)) .set_tcp_keepalive(&TcpKeepalive::from(keepalive_config))
.map_err(Error::connect)?; .map_err(Error::connect)?;
} }