Remove CopyInStatement
The API's awful to use.
This commit is contained in:
parent
e094ce66d6
commit
b5d9a38a59
154
src/lib.rs
154
src/lib.rs
@ -778,35 +778,6 @@ impl InnerConnection {
|
||||
})
|
||||
}
|
||||
|
||||
fn prepare_copy_in<'a>(&mut self, table: &str, rows: &[&str], conn: &'a Connection)
|
||||
-> Result<CopyInStatement<'a>> {
|
||||
let mut query = vec![];
|
||||
let _ = write!(&mut query, "SELECT ");
|
||||
let _ = util::comma_join_quoted_idents(&mut query, rows.iter().cloned());
|
||||
let _ = write!(&mut query, " FROM ");
|
||||
let _ = util::write_quoted_ident(&mut query, table);
|
||||
let query = String::from_utf8(query).unwrap();
|
||||
let (_, columns) = try!(self.raw_prepare("", &query));
|
||||
let column_types = columns.into_iter().map(|desc| desc.type_).collect();
|
||||
|
||||
let mut query = vec![];
|
||||
let _ = write!(&mut query, "COPY ");
|
||||
let _ = util::write_quoted_ident(&mut query, table);
|
||||
let _ = write!(&mut query, " (");
|
||||
let _ = util::comma_join_quoted_idents(&mut query, rows.iter().cloned());
|
||||
let _ = write!(&mut query, ") FROM STDIN WITH (FORMAT binary)");
|
||||
let query = String::from_utf8(query).unwrap();
|
||||
let stmt_name = self.make_stmt_name();
|
||||
try!(self.raw_prepare(&stmt_name, &query));
|
||||
|
||||
Ok(CopyInStatement {
|
||||
conn: conn,
|
||||
name: stmt_name,
|
||||
column_types: column_types,
|
||||
finished: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> {
|
||||
try!(self.write_messages(&[
|
||||
Close {
|
||||
@ -1094,15 +1065,6 @@ impl Connection {
|
||||
self.conn.borrow_mut().prepare_cached(query, self)
|
||||
}
|
||||
|
||||
/// Creates a new COPY FROM STDIN prepared statement.
|
||||
///
|
||||
/// These statements provide a method to efficiently bulk-upload data to
|
||||
/// the database.
|
||||
pub fn prepare_copy_in<'a>(&'a self, table: &str, rows: &[&str])
|
||||
-> Result<CopyInStatement<'a>> {
|
||||
self.conn.borrow_mut().prepare_copy_in(table, rows, self)
|
||||
}
|
||||
|
||||
/// Begins a new transaction.
|
||||
///
|
||||
/// Returns a `Transaction` object which should be used instead of
|
||||
@ -1322,11 +1284,6 @@ impl<'conn> Transaction<'conn> {
|
||||
self.conn.prepare_cached(query)
|
||||
}
|
||||
|
||||
/// Like `Connection::prepare_copy_in`.
|
||||
pub fn prepare_copy_in(&self, table: &str, cols: &[&str]) -> Result<CopyInStatement<'conn>> {
|
||||
self.conn.prepare_copy_in(table, cols)
|
||||
}
|
||||
|
||||
/// Like `Connection::execute`.
|
||||
pub fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
|
||||
self.conn.execute(query, params)
|
||||
@ -2030,90 +1987,7 @@ impl<'trans, 'stmt> Iterator for LazyRows<'trans, 'stmt> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A prepared `COPY FROM STDIN` statement.
|
||||
pub struct CopyInStatement<'a> {
|
||||
conn: &'a Connection,
|
||||
name: String,
|
||||
column_types: Vec<Type>,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl<'a> fmt::Debug for CopyInStatement<'a> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
DebugStruct::new(fmt, "CopyInStatement")
|
||||
.field("name", &self.name)
|
||||
.field("column_types", &self.column_types)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for CopyInStatement<'a> {
|
||||
fn drop(&mut self) {
|
||||
if !self.finished {
|
||||
let _ = self.finish_inner();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An `Iterator` variant which returns borrowed values.
|
||||
pub trait StreamIterator {
|
||||
/// Returns the next value, or `None` if there is none.
|
||||
fn next<'a>(&'a mut self) -> Option<&'a (ToSql + 'a)>;
|
||||
}
|
||||
|
||||
/// An adapter type implementing `StreamIterator` for a `Vec<Box<ToSql>>`.
|
||||
pub struct VecStreamIterator<'a> {
|
||||
v: Vec<Box<ToSql + 'a>>,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl<'a> VecStreamIterator<'a> {
|
||||
/// Creates a new `VecStreamIterator`.
|
||||
pub fn new(v: Vec<Box<ToSql + 'a>>) -> VecStreamIterator<'a> {
|
||||
VecStreamIterator {
|
||||
v: v,
|
||||
idx: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the underlying `Vec`.
|
||||
pub fn into_inner(self) -> Vec<Box<ToSql + 'a>> {
|
||||
self.v
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> StreamIterator for VecStreamIterator<'a> {
|
||||
fn next<'b>(&'b mut self) -> Option<&'b (ToSql + 'b)> {
|
||||
match self.v.get_mut(self.idx) {
|
||||
Some(mut e) => {
|
||||
self.idx += 1;
|
||||
Some(&mut **e)
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> CopyInStatement<'a> {
|
||||
fn finish_inner(&mut self) -> Result<()> {
|
||||
let mut conn = self.conn.conn.borrow_mut();
|
||||
check_desync!(conn);
|
||||
conn.close_statement(&self.name, b'S')
|
||||
}
|
||||
|
||||
/// Returns a slice containing the expected column types.
|
||||
pub fn column_types(&self) -> &[Type] {
|
||||
&self.column_types
|
||||
}
|
||||
|
||||
/// Executes the prepared statement.
|
||||
///
|
||||
/// The `rows` argument is an `Iterator` returning `StreamIterator` values,
|
||||
/// each one of which provides values for a row of input. This setup is
|
||||
/// designed to allow callers to avoid having to maintain the entire row
|
||||
/// set in memory.
|
||||
///
|
||||
/// Returns the number of rows copied.
|
||||
/*
|
||||
pub fn execute<I, J>(&self, rows: I) -> Result<u64>
|
||||
where I: Iterator<Item=J>, J: StreamIterator {
|
||||
let mut conn = self.conn.conn.borrow_mut();
|
||||
@ -2225,17 +2099,7 @@ impl<'a> CopyInStatement<'a> {
|
||||
try!(conn.wait_for_ready());
|
||||
Ok(num)
|
||||
}
|
||||
|
||||
/// Consumes the statement, clearing it from the Postgres session.
|
||||
///
|
||||
/// Functionally identical to the `Drop` implementation of the
|
||||
/// `CopyInStatement` except that it returns any error to the
|
||||
/// caller.
|
||||
pub fn finish(mut self) -> Result<()> {
|
||||
self.finished = true;
|
||||
self.finish_inner()
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/// A trait allowing abstraction over connections and transactions
|
||||
pub trait GenericConnection {
|
||||
@ -2248,10 +2112,6 @@ pub trait GenericConnection {
|
||||
/// Like `Connection::execute`.
|
||||
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64>;
|
||||
|
||||
/// Like `Connection::prepare_copy_in`.
|
||||
fn prepare_copy_in<'a>(&'a self, table: &str, columns: &[&str])
|
||||
-> Result<CopyInStatement<'a>>;
|
||||
|
||||
/// Like `Connection::transaction`.
|
||||
fn transaction<'a>(&'a self) -> Result<Transaction<'a>>;
|
||||
|
||||
@ -2279,11 +2139,6 @@ impl GenericConnection for Connection {
|
||||
self.transaction()
|
||||
}
|
||||
|
||||
fn prepare_copy_in<'a>(&'a self, table: &str, columns: &[&str])
|
||||
-> Result<CopyInStatement<'a>> {
|
||||
self.prepare_copy_in(table, columns)
|
||||
}
|
||||
|
||||
fn batch_execute(&self, query: &str) -> Result<()> {
|
||||
self.batch_execute(query)
|
||||
}
|
||||
@ -2310,11 +2165,6 @@ impl<'a> GenericConnection for Transaction<'a> {
|
||||
self.transaction()
|
||||
}
|
||||
|
||||
fn prepare_copy_in<'b>(&'b self, table: &str, columns: &[&str])
|
||||
-> Result<CopyInStatement<'b>> {
|
||||
self.prepare_copy_in(table, columns)
|
||||
}
|
||||
|
||||
fn batch_execute(&self, query: &str) -> Result<()> {
|
||||
self.batch_execute(query)
|
||||
}
|
||||
|
28
src/util.rs
28
src/util.rs
@ -1,33 +1,5 @@
|
||||
use std::io;
|
||||
use std::io::prelude::*;
|
||||
use std::ascii::AsciiExt;
|
||||
|
||||
pub fn comma_join_quoted_idents<'a, W, I>(writer: &mut W, strs: I) -> io::Result<()>
|
||||
where W: Write, I: Iterator<Item=&'a str> {
|
||||
let mut first = true;
|
||||
for str_ in strs {
|
||||
if !first {
|
||||
try!(write!(writer, ", "));
|
||||
}
|
||||
first = false;
|
||||
try!(write_quoted_ident(writer, str_));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// See http://www.postgresql.org/docs/9.4/static/sql-syntax-lexical.html for ident grammar
|
||||
pub fn write_quoted_ident<W: Write>(w: &mut W, ident: &str) -> io::Result<()> {
|
||||
try!(write!(w, "U&\""));
|
||||
for ch in ident.chars() {
|
||||
match ch {
|
||||
'"' => try!(write!(w, "\"\"")),
|
||||
'\\' => try!(write!(w, "\\\\")),
|
||||
ch if ch.is_ascii() => try!(write!(w, "{}", ch)),
|
||||
ch => try!(write!(w, "\\+{:06X}", ch as u32)),
|
||||
}
|
||||
}
|
||||
write!(w, "\"")
|
||||
}
|
||||
|
||||
pub fn parse_update_count(tag: String) -> u64 {
|
||||
tag.split(' ').last().unwrap().parse().unwrap_or(0)
|
||||
|
@ -19,8 +19,7 @@ use postgres::{HandleNotice,
|
||||
ConnectError,
|
||||
DbError,
|
||||
IntoConnectParams,
|
||||
IsolationLevel,
|
||||
VecStreamIterator};
|
||||
IsolationLevel};
|
||||
use postgres::SqlState::{SyntaxError,
|
||||
QueryCanceled,
|
||||
UndefinedTable,
|
||||
@ -755,94 +754,6 @@ fn test_execute_copy_from_err() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_in() {
|
||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
|
||||
|
||||
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
|
||||
|
||||
let data = (0i32..2).map(|i| {
|
||||
VecStreamIterator::new(vec![Box::new(i),
|
||||
Box::new(format!("{}", i))])
|
||||
});
|
||||
|
||||
assert_eq!(2, stmt.execute(data).unwrap());
|
||||
|
||||
let stmt = or_panic!(conn.prepare("SELECT id, name FROM foo ORDER BY id"));
|
||||
assert_eq!(vec![(0i32, Some("0".to_string())), (1, Some("1".to_string()))],
|
||||
or_panic!(stmt.query(&[])).iter().map(|r| (r.get(0), r.get(1))).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_in_bad_column_count() {
|
||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
|
||||
|
||||
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
|
||||
let data = vec![
|
||||
VecStreamIterator::new(vec![Box::new(1i32),
|
||||
Box::new("Steven".to_string())]),
|
||||
VecStreamIterator::new(vec![Box::new(2i32)]),
|
||||
].into_iter();
|
||||
|
||||
let res = stmt.execute(data);
|
||||
match res {
|
||||
Err(Error::DbError(ref err)) if err.message().contains("Invalid column count") => {}
|
||||
Err(err) => panic!("unexpected error {:?}", err),
|
||||
_ => panic!("Expected error"),
|
||||
}
|
||||
|
||||
let data = vec![
|
||||
VecStreamIterator::new(vec![Box::new(1i32),
|
||||
Box::new("Steven".to_string())]),
|
||||
VecStreamIterator::new(vec![Box::new(2i32),
|
||||
Box::new("Steven".to_string()),
|
||||
Box::new(3i64)]),
|
||||
].into_iter();
|
||||
|
||||
let res = stmt.execute(data);
|
||||
match res {
|
||||
Err(Error::DbError(ref err)) if err.message().contains("Invalid column count") => {}
|
||||
Err(err) => panic!("unexpected error {:?}", err),
|
||||
_ => panic!("Expected error"),
|
||||
}
|
||||
|
||||
or_panic!(conn.execute("SELECT 1", &[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_in_bad_type() {
|
||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
|
||||
|
||||
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
|
||||
|
||||
let data = vec![
|
||||
VecStreamIterator::new(vec![Box::new(1i32),
|
||||
Box::new("Steven".to_string())]),
|
||||
VecStreamIterator::new(vec![Box::new(2i32),
|
||||
Box::new(1i32)]),
|
||||
].into_iter();
|
||||
|
||||
let res = stmt.execute(data);
|
||||
match res {
|
||||
Err(Error::DbError(ref err)) if err.message().contains("saw type Varchar") => {}
|
||||
Err(err) => panic!("unexpected error {:?}", err),
|
||||
_ => panic!("Expected error"),
|
||||
}
|
||||
|
||||
or_panic!(conn.execute("SELECT 1", &[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_in_weird_names() {
|
||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||
or_panic!(conn.execute(r#"CREATE TEMPORARY TABLE "na""me" (U&" \\\+01F4A9" VARCHAR)"#, &[]));
|
||||
let stmt = or_panic!(conn.prepare_copy_in("na\"me", &[" \\💩"]));
|
||||
assert_eq!(&Type::Varchar, &stmt.column_types()[0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_execute_copy_from_err() {
|
||||
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
|
||||
|
Loading…
Reference in New Issue
Block a user