Expose RowIter

This commit is contained in:
Steven Fackler 2019-11-30 12:13:38 -05:00
parent 1390cc57d7
commit ff3ea1c9df
5 changed files with 44 additions and 66 deletions

View File

@ -1,8 +1,6 @@
use crate::iter::Iter;
#[cfg(feature = "runtime")]
use crate::Config;
use crate::{CopyInWriter, CopyOutReader, Statement, ToStatement, Transaction};
use fallible_iterator::FallibleIterator;
use crate::{CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction};
use futures::executor;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::types::{ToSql, Type};
@ -183,18 +181,14 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub fn query_raw<'a, T, I>(
&mut self,
query: &T,
params: I,
) -> Result<impl FallibleIterator<Item = Row, Error = Error>, Error>
pub fn query_raw<'a, T, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'a dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
let stream = executor::block_on(self.0.query_raw(query, params))?;
Ok(Iter::new(stream))
Ok(RowIter::new(stream))
}
/// Creates a new prepared statement.

View File

@ -1,45 +0,0 @@
use fallible_iterator::FallibleIterator;
use futures::executor::{self, BlockingStream};
use futures::Stream;
use std::marker::PhantomData;
use std::pin::Pin;
pub struct Iter<'a, S>
where
S: Stream,
{
it: BlockingStream<Pin<Box<S>>>,
_p: PhantomData<&'a mut ()>,
}
// no-op impl to extend the borrow until drop
impl<'a, S> Drop for Iter<'a, S>
where
S: Stream,
{
fn drop(&mut self) {}
}
impl<'a, S> Iter<'a, S>
where
S: Stream,
{
pub fn new(stream: S) -> Iter<'a, S> {
Iter {
it: executor::block_on_stream(Box::pin(stream)),
_p: PhantomData,
}
}
}
impl<'a, S, T, E> FallibleIterator for Iter<'a, S>
where
S: Stream<Item = Result<T, E>>,
{
type Item = T;
type Error = E;
fn next(&mut self) -> Result<Option<T>, E> {
self.it.next().transpose()
}
}

View File

@ -75,6 +75,7 @@ pub use crate::copy_out_reader::CopyOutReader;
pub use crate::error::Error;
#[doc(no_inline)]
pub use crate::row::{Row, SimpleQueryRow};
pub use crate::row_iter::RowIter;
#[doc(no_inline)]
pub use crate::tls::NoTls;
pub use crate::transaction::*;
@ -84,7 +85,7 @@ mod client;
pub mod config;
mod copy_in_writer;
mod copy_out_reader;
mod iter;
mod row_iter;
mod transaction;
#[cfg(feature = "runtime")]

34
postgres/src/row_iter.rs Normal file
View File

@ -0,0 +1,34 @@
use fallible_iterator::FallibleIterator;
use futures::executor::{self, BlockingStream};
use std::marker::PhantomData;
use std::pin::Pin;
use tokio_postgres::{Error, Row, RowStream};
/// The iterator returned by `query_raw`.
pub struct RowIter<'a> {
it: BlockingStream<Pin<Box<RowStream>>>,
_p: PhantomData<&'a mut ()>,
}
// no-op impl to extend the borrow until drop
impl Drop for RowIter<'_> {
fn drop(&mut self) {}
}
impl<'a> RowIter<'a> {
pub(crate) fn new(stream: RowStream) -> RowIter<'a> {
RowIter {
it: executor::block_on_stream(Box::pin(stream)),
_p: PhantomData,
}
}
}
impl FallibleIterator for RowIter<'_> {
type Item = Row;
type Error = Error;
fn next(&mut self) -> Result<Option<Row>, Error> {
self.it.next().transpose()
}
}

View File

@ -1,6 +1,4 @@
use crate::iter::Iter;
use crate::{CopyInWriter, CopyOutReader, Portal, Statement, ToStatement};
use fallible_iterator::FallibleIterator;
use crate::{CopyInWriter, CopyOutReader, Portal, RowIter, Statement, ToStatement};
use futures::executor;
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage};
@ -63,18 +61,14 @@ impl<'a> Transaction<'a> {
}
/// Like `Client::query_raw`.
pub fn query_raw<'b, T, I>(
&mut self,
query: &T,
params: I,
) -> Result<impl FallibleIterator<Item = Row, Error = Error>, Error>
pub fn query_raw<'b, T, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'b dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
let stream = executor::block_on(self.0.query_raw(query, params))?;
Ok(Iter::new(stream))
Ok(RowIter::new(stream))
}
/// Binds parameters to a statement, creating a "portal".
@ -107,9 +101,9 @@ impl<'a> Transaction<'a> {
&mut self,
portal: &Portal,
max_rows: i32,
) -> Result<impl FallibleIterator<Item = Row, Error = Error>, Error> {
) -> Result<RowIter<'_>, Error> {
let stream = executor::block_on(self.0.query_portal_raw(portal, max_rows))?;
Ok(Iter::new(stream))
Ok(RowIter::new(stream))
}
/// Like `Client::copy_in`.