tokio-postgres: buffer sockets to avoid excessive syscalls

The current implementation forwards all read requests to the operating
system through the socket causing excessive system calls. The effect is
magnified when the underlying Socket is wrapped around a TLS
implementation.

This commit changes the underlying socket to be read-buffered by default
with a buffer size of 16K, following the implementation of the official
client.

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
This commit is contained in:
Petros Angelatos 2021-05-24 17:54:24 +02:00
parent 52de269367
commit ca6d4b8162
2 changed files with 6 additions and 4 deletions

View File

@ -51,7 +51,7 @@ use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, BufReader, ReadBuf};
use tokio_postgres::tls; use tokio_postgres::tls;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::tls::MakeTlsConnect;
@ -115,6 +115,7 @@ where
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, native_tls::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, native_tls::Error>> + Send>>;
fn connect(self, stream: S) -> Self::Future { fn connect(self, stream: S) -> Self::Future {
let stream = BufReader::with_capacity(8192, stream);
let future = async move { let future = async move {
let stream = self.connector.connect(&self.domain, stream).await?; let stream = self.connector.connect(&self.domain, stream).await?;
@ -126,7 +127,7 @@ where
} }
/// The stream returned by `TlsConnector`. /// The stream returned by `TlsConnector`.
pub struct TlsStream<S>(tokio_native_tls::TlsStream<S>); pub struct TlsStream<S>(tokio_native_tls::TlsStream<BufReader<S>>);
impl<S> AsyncRead for TlsStream<S> impl<S> AsyncRead for TlsStream<S>
where where

View File

@ -57,7 +57,7 @@ use std::pin::Pin;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, BufReader, ReadBuf};
use tokio_openssl::SslStream; use tokio_openssl::SslStream;
use tokio_postgres::tls; use tokio_postgres::tls;
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
@ -140,6 +140,7 @@ where
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, Self::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, Self::Error>> + Send>>;
fn connect(self, stream: S) -> Self::Future { fn connect(self, stream: S) -> Self::Future {
let stream = BufReader::with_capacity(8192, stream);
let future = async move { let future = async move {
let ssl = self.ssl.into_ssl(&self.domain)?; let ssl = self.ssl.into_ssl(&self.domain)?;
let mut stream = SslStream::new(ssl, stream)?; let mut stream = SslStream::new(ssl, stream)?;
@ -182,7 +183,7 @@ impl Error for ConnectError {
} }
/// The stream returned by `TlsConnector`. /// The stream returned by `TlsConnector`.
pub struct TlsStream<S>(SslStream<S>); pub struct TlsStream<S>(SslStream<BufReader<S>>);
impl<S> AsyncRead for TlsStream<S> impl<S> AsyncRead for TlsStream<S>
where where