Don't take parameters in copy_in and copy_out

Postgres doesn't support them, so we may as well not provide the option!

Closes #523
This commit is contained in:
Steven Fackler 2019-12-04 18:51:59 -08:00
parent ac4c63f4af
commit 5c33bf8b30
9 changed files with 43 additions and 77 deletions

View File

@ -302,6 +302,7 @@ impl Client {
///
/// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
/// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
/// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
///
@ -314,27 +315,24 @@ impl Client {
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let mut writer = client.copy_in("COPY people FROM stdin", &[])?;
/// let mut writer = client.copy_in("COPY people FROM stdin")?;
/// writer.write_all(b"1\tjohn\n2\tjane\n")?;
/// writer.finish()?;
/// # Ok(())
/// # }
/// ```
pub fn copy_in<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyInWriter<'_>, Error>
pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
where
T: ?Sized + ToStatement,
{
let sink = self.runtime.block_on(self.client.copy_in(query, params))?;
let sink = self.runtime.block_on(self.client.copy_in(query))?;
Ok(CopyInWriter::new(&mut self.runtime, sink))
}
/// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
///
/// The `query` argument can either be a `Statement`, or a raw query string.
/// The `query` argument can either be a `Statement`, or a raw query string. PostgreSQL does not support parameters
/// in `COPY` statements, so this method does not take any.
///
/// # Examples
///
@ -345,21 +343,17 @@ impl Client {
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
///
/// let mut reader = client.copy_out("COPY people TO stdout", &[])?;
/// let mut reader = client.copy_out("COPY people TO stdout")?;
/// let mut buf = vec![];
/// reader.read_to_end(&mut buf)?;
/// # Ok(())
/// # }
/// ```
pub fn copy_out<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyOutReader<'_>, Error>
pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
where
T: ?Sized + ToStatement,
{
let stream = self.runtime.block_on(self.client.copy_out(query, params))?;
let stream = self.runtime.block_on(self.client.copy_out(query))?;
CopyOutReader::new(&mut self.runtime, stream)
}

View File

@ -154,7 +154,7 @@ fn copy_in() {
.simple_query("CREATE TEMPORARY TABLE foo (id INT, name TEXT)")
.unwrap();
let mut writer = client.copy_in("COPY foo FROM stdin", &[]).unwrap();
let mut writer = client.copy_in("COPY foo FROM stdin").unwrap();
writer.write_all(b"1\tsteven\n2\ttimothy").unwrap();
writer.finish().unwrap();
@ -177,7 +177,7 @@ fn copy_in_abort() {
.simple_query("CREATE TEMPORARY TABLE foo (id INT, name TEXT)")
.unwrap();
let mut writer = client.copy_in("COPY foo FROM stdin", &[]).unwrap();
let mut writer = client.copy_in("COPY foo FROM stdin").unwrap();
writer.write_all(b"1\tsteven\n2\ttimothy").unwrap();
drop(writer);
@ -199,9 +199,7 @@ fn copy_out() {
)
.unwrap();
let mut reader = client
.copy_out("COPY foo (id, name) TO STDOUT", &[])
.unwrap();
let mut reader = client.copy_out("COPY foo (id, name) TO STDOUT").unwrap();
let mut s = String::new();
reader.read_to_string(&mut s).unwrap();
drop(reader);

View File

@ -137,32 +137,20 @@ impl<'a> Transaction<'a> {
}
/// Like `Client::copy_in`.
pub fn copy_in<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyInWriter<'_>, Error>
pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
where
T: ?Sized + ToStatement,
{
let sink = self
.runtime
.block_on(self.transaction.copy_in(query, params))?;
let sink = self.runtime.block_on(self.transaction.copy_in(query))?;
Ok(CopyInWriter::new(self.runtime, sink))
}
/// Like `Client::copy_out`.
pub fn copy_out<T>(
&mut self,
query: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyOutReader<'_>, Error>
pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
where
T: ?Sized + ToStatement,
{
let stream = self
.runtime
.block_on(self.transaction.copy_out(query, params))?;
let stream = self.runtime.block_on(self.transaction.copy_out(query))?;
CopyOutReader::new(self.runtime, stream)
}

View File

@ -24,7 +24,7 @@ async fn write_basic() {
.unwrap();
let sink = client
.copy_in("COPY foo (id, bar) FROM STDIN BINARY", &[])
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
@ -58,7 +58,7 @@ async fn write_many_rows() {
.unwrap();
let sink = client
.copy_in("COPY foo (id, bar) FROM STDIN BINARY", &[])
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
@ -94,7 +94,7 @@ async fn write_big_rows() {
.unwrap();
let sink = client
.copy_in("COPY foo (id, bar) FROM STDIN BINARY", &[])
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]);
@ -135,7 +135,7 @@ async fn read_basic() {
.unwrap();
let stream = client
.copy_out("COPY foo (id, bar) TO STDIN BINARY", &[])
.copy_out("COPY foo (id, bar) TO STDIN BINARY")
.await
.unwrap();
let rows = BinaryCopyOutStream::new(&[Type::INT4, Type::TEXT], stream)
@ -164,7 +164,7 @@ async fn read_many_rows() {
.unwrap();
let stream = client
.copy_out("COPY foo (id, bar) TO STDIN BINARY", &[])
.copy_out("COPY foo (id, bar) TO STDIN BINARY")
.await
.unwrap();
let rows = BinaryCopyOutStream::new(&[Type::INT4, Type::TEXT], stream)
@ -198,7 +198,7 @@ async fn read_big_rows() {
}
let stream = client
.copy_out("COPY foo (id, bar) TO STDIN BINARY", &[])
.copy_out("COPY foo (id, bar) TO STDIN BINARY")
.await
.unwrap();
let rows = BinaryCopyOutStream::new(&[Type::INT4, Type::BYTEA], stream)

View File

@ -378,42 +378,40 @@ impl Client {
/// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
///
/// The copy *must* be explicitly completed via the `Sink::close` or `finish` methods. If it is
/// not, the copy will be aborted.
/// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
/// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
/// Panics if the statement contains parameters.
pub async fn copy_in<T, U>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyInSink<U>, Error>
where
T: ?Sized + ToStatement,
U: Buf + 'static + Send,
{
let statement = statement.__convert().into_statement(self).await?;
let params = slice_iter(params);
copy_in::copy_in(self.inner(), statement, params).await
copy_in::copy_in(self.inner(), statement).await
}
/// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
///
/// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
/// Panics if the statement contains parameters.
pub async fn copy_out<T>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyOutStream, Error>
where
T: ?Sized + ToStatement,
{
let statement = statement.__convert().into_statement(self).await?;
let params = slice_iter(params);
copy_out::copy_out(self.inner(), statement, params).await
copy_out::copy_out(self.inner(), statement).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.

View File

@ -1,8 +1,7 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::ToSql;
use crate::{query, Error, Statement};
use crate::{query, Error, Statement, slice_iter};
use bytes::buf::BufExt;
use bytes::{Buf, BufMut, BytesMut};
use futures::channel::mpsc;
@ -196,17 +195,14 @@ where
}
}
pub async fn copy_in<'a, I, T>(
pub async fn copy_in<T>(
client: &InnerClient,
statement: Statement,
params: I,
) -> Result<CopyInSink<T>, Error>
where
I: IntoIterator<Item = &'a dyn ToSql>,
I::IntoIter: ExactSizeIterator,
T: Buf + 'static + Send,
{
let buf = query::encode(client, &statement, params)?;
let buf = query::encode(client, &statement, slice_iter(&[]))?;
let (mut sender, receiver) = mpsc::channel(1);
let receiver = CopyInReceiver::new(receiver);

View File

@ -1,8 +1,7 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::ToSql;
use crate::{query, Error, Statement};
use crate::{query, Error, Statement, slice_iter};
use bytes::Bytes;
use futures::{ready, Stream};
use pin_project_lite::pin_project;
@ -11,16 +10,11 @@ use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
pub async fn copy_out<'a, I>(
pub async fn copy_out(
client: &InnerClient,
statement: Statement,
params: I,
) -> Result<CopyOutStream, Error>
where
I: IntoIterator<Item = &'a dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
let buf = query::encode(client, &statement, params)?;
) -> Result<CopyOutStream, Error> {
let buf = query::encode(client, &statement, slice_iter(&[]))?;
let responses = start(client, buf).await?;
Ok(CopyOutStream {
responses,

View File

@ -224,25 +224,23 @@ impl<'a> Transaction<'a> {
pub async fn copy_in<T, U>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyInSink<U>, Error>
where
T: ?Sized + ToStatement,
U: Buf + 'static + Send,
{
self.client.copy_in(statement, params).await
self.client.copy_in(statement).await
}
/// Like `Client::copy_out`.
pub async fn copy_out<T>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<CopyOutStream, Error>
where
T: ?Sized + ToStatement,
{
self.client.copy_out(statement, params).await
self.client.copy_out(statement).await
}
/// Like `Client::simple_query`.

View File

@ -418,7 +418,7 @@ async fn copy_in() {
.into_iter()
.map(Ok::<_, Error>),
);
let sink = client.copy_in("COPY foo FROM STDIN", &[]).await.unwrap();
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
@ -465,7 +465,7 @@ async fn copy_in_large() {
.map(Ok::<_, Error>),
);
let sink = client.copy_in("COPY foo FROM STDIN", &[]).await.unwrap();
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
@ -487,7 +487,7 @@ async fn copy_in_error() {
.unwrap();
{
let sink = client.copy_in("COPY foo FROM STDIN", &[]).await.unwrap();
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap();
}
@ -517,7 +517,7 @@ async fn copy_out() {
let stmt = client.prepare("COPY foo TO STDOUT").await.unwrap();
let data = client
.copy_out(&stmt, &[])
.copy_out(&stmt)
.await
.unwrap()
.try_fold(BytesMut::new(), |mut buf, chunk| {