Address pull request comments
This commit is contained in:
parent
07d9fb2ed6
commit
f655c3b74c
@ -4,6 +4,7 @@ use crate::{
|
|||||||
ToStatement, Transaction, TransactionBuilder,
|
ToStatement, Transaction, TransactionBuilder,
|
||||||
};
|
};
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
use std::time::Duration;
|
||||||
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
|
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
|
||||||
use tokio_postgres::types::{BorrowToSql, ToSql, Type};
|
use tokio_postgres::types::{BorrowToSql, ToSql, Type};
|
||||||
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
|
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
|
||||||
@ -414,8 +415,14 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Validates connection, timing out after specified duration.
|
/// Validates connection, timing out after specified duration.
|
||||||
pub fn is_valid(&mut self, timeout: std::time::Duration) -> Result<(), Error> {
|
pub fn is_valid(&mut self, timeout: Duration) -> Result<(), Error> {
|
||||||
self.connection.block_on(self.client.is_valid(timeout))
|
let is_valid = Client::is_valid_inner(&self.client, timeout);
|
||||||
|
self.connection.block_on(is_valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn is_valid_inner(client: &tokio_postgres::Client, timeout: Duration) -> Result<(), Error> {
|
||||||
|
let trivial_query = client.simple_query("");
|
||||||
|
tokio::time::timeout(timeout, trivial_query).await?.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes a sequence of SQL statements using the simple query protocol.
|
/// Executes a sequence of SQL statements using the simple query protocol.
|
||||||
|
@ -20,8 +20,9 @@ where
|
|||||||
I: IntoIterator<Item = P>,
|
I: IntoIterator<Item = P>,
|
||||||
I::IntoIter: ExactSizeIterator,
|
I::IntoIter: ExactSizeIterator,
|
||||||
{
|
{
|
||||||
|
type BytesResult = Result<bytes::Bytes, Error>;
|
||||||
let name = format!("p{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
|
let name = format!("p{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
|
||||||
let buf = client.with_buf(|buf| {
|
let buf = client.with_buf::<_, BytesResult>(|buf| {
|
||||||
query::encode_bind(&statement, params, &name, buf)?;
|
query::encode_bind(&statement, params, &name, buf)?;
|
||||||
frontend::sync(buf);
|
frontend::sync(buf);
|
||||||
Ok(buf.split().freeze())
|
Ok(buf.split().freeze())
|
||||||
|
@ -450,15 +450,6 @@ impl Client {
|
|||||||
self.simple_query_raw(query).await?.try_collect().await
|
self.simple_query_raw(query).await?.try_collect().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Validates connection, timing out after specified duration.
|
|
||||||
pub async fn is_valid(&self, timeout: Duration) -> Result<(), Error> {
|
|
||||||
type SqmResult = Result<Vec<SimpleQueryMessage>, Error>;
|
|
||||||
type SqmTimeout = Result<SqmResult, tokio::time::error::Elapsed>;
|
|
||||||
let sqm_future = self.simple_query_raw("").await?.try_collect();
|
|
||||||
let sqm_timeout: SqmTimeout = tokio::time::timeout(timeout, sqm_future).await;
|
|
||||||
sqm_timeout.map_err(|_| Error::timeout())?.map(|_| ())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
|
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
|
||||||
simple_query::simple_query(self.inner(), query).await
|
simple_query::simple_query(self.inner(), query).await
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody};
|
|||||||
use std::error::{self, Error as _Error};
|
use std::error::{self, Error as _Error};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use tokio::time::error::Elapsed;
|
||||||
|
|
||||||
pub use self::sqlstate::*;
|
pub use self::sqlstate::*;
|
||||||
|
|
||||||
@ -493,8 +494,10 @@ impl Error {
|
|||||||
pub(crate) fn connect(e: io::Error) -> Error {
|
pub(crate) fn connect(e: io::Error) -> Error {
|
||||||
Error::new(Kind::Connect, Some(Box::new(e)))
|
Error::new(Kind::Connect, Some(Box::new(e)))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn timeout() -> Error {
|
impl From<Elapsed> for Error {
|
||||||
|
fn from(_e: Elapsed) -> Error {
|
||||||
Error::new(Kind::Timeout, None)
|
Error::new(Kind::Timeout, None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,8 @@ pub async fn query_portal(
|
|||||||
portal: &Portal,
|
portal: &Portal,
|
||||||
max_rows: i32,
|
max_rows: i32,
|
||||||
) -> Result<RowStream, Error> {
|
) -> Result<RowStream, Error> {
|
||||||
let buf = client.with_buf(|buf| {
|
type BytesResult = Result<bytes::Bytes, Error>;
|
||||||
|
let buf = client.with_buf::<_, BytesResult>(|buf| {
|
||||||
frontend::execute(portal.name(), max_rows, buf).map_err(Error::encode)?;
|
frontend::execute(portal.name(), max_rows, buf).map_err(Error::encode)?;
|
||||||
frontend::sync(buf);
|
frontend::sync(buf);
|
||||||
Ok(buf.split().freeze())
|
Ok(buf.split().freeze())
|
||||||
|
Loading…
Reference in New Issue
Block a user