diff --git a/src/lib.rs b/src/lib.rs index 300be5b4..50909693 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -906,25 +906,6 @@ impl Connection { /// /// These statements provide a method to efficiently bulk-upload data to /// the database. - /// - /// ## Example - /// - /// ```rust,no_run - /// # use postgres::{Connection, SslMode, Error}; - /// # use postgres::types::ToSql; - /// # fn f() -> Result<(), Error> { - /// # let conn = Connection::connect("", &SslMode::None).unwrap(); - /// try!(conn.execute("CREATE TABLE foo ( - /// bar INT PRIMARY KEY, - /// baz VARCHAR - /// )", &[])); - /// - /// let stmt = try!(conn.prepare_copy_in("foo", &["bar", "baz"])); - /// let data: &[&[&ToSql]] = &[&[&0i32, &"blah"], - /// &[&1i32, &None::]]; - /// try!(stmt.execute(data.iter().map(|r| r.iter().map(|&e| e)))); - /// # Ok(()) }; - /// ``` pub fn prepare_copy_in<'a>(&'a self, table: &str, rows: &[&str]) -> Result> { let mut conn = self.conn.borrow_mut(); @@ -1700,6 +1681,45 @@ impl<'a> Drop for CopyInStatement<'a> { } } +/// An `Iterator` variant which returns borrowed values. +pub trait StreamIterator { + /// Returns the next value, or `None` if there is none. + fn next<'a>(&'a mut self) -> Option<&'a (ToSql + 'a)>; +} + +/// An adapter type implementing `StreamIterator` for a `Vec>`. +pub struct VecStreamIterator<'a> { + v: Vec>, + idx: usize, +} + +impl<'a> VecStreamIterator<'a> { + /// Creates a new `VecStreamIterator`. + pub fn new(v: Vec>) -> VecStreamIterator<'a> { + VecStreamIterator { + v: v, + idx: 0, + } + } + + /// Returns the underlying `Vec`. + pub fn into_inner(self) -> Vec> { + self.v + } +} + +impl<'a> StreamIterator for VecStreamIterator<'a> { + fn next<'b>(&'b mut self) -> Option<&'b (ToSql + 'b)> { + match self.v.get_mut(self.idx) { + Some(mut e) => { + self.idx += 1; + Some(&mut **e) + }, + None => None, + } + } +} + impl<'a> CopyInStatement<'a> { fn finish_inner(&mut self) -> Result<()> { let mut conn = self.conn.conn.borrow_mut(); @@ -1714,12 +1734,14 @@ impl<'a> CopyInStatement<'a> { /// Executes the prepared statement. /// - /// Each iterator returned by the `rows` iterator will be interpreted as - /// providing a single result row. + /// The `rows` argument is an `Iterator` returning `StreamIterator` values, + /// each one of which provides values for a row of input. This setup is + /// designed to allow callers to avoid having to maintain the entire row + /// set in memory. /// /// Returns the number of rows copied. pub fn execute<'b, I, J>(&self, mut rows: I) -> Result - where I: Iterator, J: Iterator { + where I: Iterator, J: StreamIterator { let mut conn = self.conn.conn.borrow_mut(); try!(conn.write_messages(&[ diff --git a/tests/test.rs b/tests/test.rs index 29f561c3..4a026ec4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -23,7 +23,8 @@ use postgres::{NoticeHandler, ToSql, Error, ConnectError, - DbError}; + DbError, + VecStreamIterator}; use postgres::SqlState::{SyntaxError, QueryCanceled, UndefinedTable, @@ -786,12 +787,16 @@ fn test_copy_in() { or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[])); let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"])); - let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &None::]]; - assert_eq!(Ok(2), stmt.execute(data.iter().map(|r| r.iter().map(|&e| e)))); + let data = (0i32..2).map(|i| { + VecStreamIterator::new(vec![Box::new(i) as Box, + Box::new(format!("{}", i)) as Box]) + }); + + assert_eq!(Ok(2), stmt.execute(data)); let stmt = or_panic!(conn.prepare("SELECT id, name FROM foo ORDER BY id")); - assert_eq!(vec![(0i32, Some("Steven".to_string())), (1, None)], + assert_eq!(vec![(0i32, Some("0".to_string())), (1, Some("1".to_string()))], or_panic!(stmt.query(&[])).map(|r| (r.get(0), r.get(1))).collect::>()); } @@ -801,18 +806,28 @@ fn test_copy_in_bad_column_count() { or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[])); let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"])); - let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32]]; + let data = vec![ + VecStreamIterator::new(vec![Box::new(1i32) as Box, + Box::new("Steven".to_string()) as Box]), + VecStreamIterator::new(vec![Box::new(2i32) as Box]), + ].into_iter(); - let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e))); + let res = stmt.execute(data); match res { Err(Error::DbError(ref err)) if err.message[].contains("Invalid column count") => {} Err(err) => panic!("unexpected error {:?}", err), _ => panic!("Expected error"), } - let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &"Steven".to_string(), &1i32]]; + let data = vec![ + VecStreamIterator::new(vec![Box::new(1i32) as Box, + Box::new("Steven".to_string()) as Box]), + VecStreamIterator::new(vec![Box::new(2i32) as Box, + Box::new("Steven".to_string()) as Box, + Box::new(3i64) as Box]), + ].into_iter(); - let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e))); + let res = stmt.execute(data); match res { Err(Error::DbError(ref err)) if err.message[].contains("Invalid column count") => {} Err(err) => panic!("unexpected error {:?}", err), @@ -828,9 +843,15 @@ fn test_copy_in_bad_type() { or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[])); let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"])); - let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &2i32]]; - let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e))); + let data = vec![ + VecStreamIterator::new(vec![Box::new(1i32) as Box, + Box::new("Steven".to_string()) as Box]), + VecStreamIterator::new(vec![Box::new(2i32) as Box, + Box::new(1i32) as Box]), + ].into_iter(); + + let res = stmt.execute(data); match res { Err(Error::DbError(ref err)) if err.message[].contains("Unexpected type Varchar") => {} Err(err) => panic!("unexpected error {:?}", err),