Provide more information for copy in readers

This commit is contained in:
Steven Fackler 2015-08-22 13:50:20 -07:00
parent 6c0a7c3b79
commit c21fc2966b
2 changed files with 75 additions and 55 deletions

View File

@ -7,7 +7,7 @@ use std::fmt;
use std::io::{self, Cursor, BufRead, Read}; use std::io::{self, Cursor, BufRead, Read};
use error::{Error, DbError}; use error::{Error, DbError};
use types::{ReadWithInfo, SessionInfo, Type, ToSql, IsNull}; use types::{SessionInfo, Type, ToSql, IsNull};
use message::FrontendMessage::*; use message::FrontendMessage::*;
use message::BackendMessage::*; use message::BackendMessage::*;
use message::WriteMessage; use message::WriteMessage;
@ -309,8 +309,8 @@ impl<'conn> Statement<'conn> {
try!(self.inner_execute("", 0, params)); try!(self.inner_execute("", 0, params));
let mut conn = self.conn.conn.borrow_mut(); let mut conn = self.conn.conn.borrow_mut();
match try!(conn.read_message()) { let (format, column_formats) = match try!(conn.read_message()) {
CopyInResponse { .. } => {} CopyInResponse { format, column_formats } => (format, column_formats),
_ => { _ => {
loop { loop {
match try!(conn.read_message()) { match try!(conn.read_message()) {
@ -323,53 +323,59 @@ impl<'conn> Statement<'conn> {
} }
} }
} }
} };
let mut info = CopyInfo {
conn: conn,
format: Format::from_u16(format as u16),
column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(),
};
let mut buf = [0; 16 * 1024]; let mut buf = [0; 16 * 1024];
loop { loop {
match fill_copy_buf(&mut buf, r, &SessionInfo::new(&conn)) { match fill_copy_buf(&mut buf, r, &info) {
Ok(0) => break, Ok(0) => break,
Ok(len) => { Ok(len) => {
try_desync!(conn, conn.stream.write_message( try_desync!(info.conn, info.conn.stream.write_message(
&CopyData { &CopyData {
data: &buf[..len], data: &buf[..len],
})); }));
} }
Err(err) => { Err(err) => {
try!(conn.write_messages(&[ try!(info.conn.write_messages(&[
CopyFail { CopyFail {
message: "", message: "",
}, },
CopyDone, CopyDone,
Sync])); Sync]));
match try!(conn.read_message()) { match try!(info.conn.read_message()) {
ErrorResponse { .. } => { /* expected from the CopyFail */ } ErrorResponse { .. } => { /* expected from the CopyFail */ }
_ => { _ => {
conn.desynchronized = true; info.conn.desynchronized = true;
return Err(Error::IoError(bad_response())); return Err(Error::IoError(bad_response()));
} }
} }
try!(conn.wait_for_ready()); try!(info.conn.wait_for_ready());
return Err(Error::IoError(err)); return Err(Error::IoError(err));
} }
} }
} }
try!(conn.write_messages(&[CopyDone, Sync])); try!(info.conn.write_messages(&[CopyDone, Sync]));
let num = match try!(conn.read_message()) { let num = match try!(info.conn.read_message()) {
CommandComplete { tag } => util::parse_update_count(tag), CommandComplete { tag } => util::parse_update_count(tag),
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(conn.wait_for_ready()); try!(info.conn.wait_for_ready());
return DbError::new(fields); return DbError::new(fields);
} }
_ => { _ => {
conn.desynchronized = true; info.conn.desynchronized = true;
return Err(Error::IoError(bad_response())); return Err(Error::IoError(bad_response()));
} }
}; };
try!(conn.wait_for_ready()); try!(info.conn.wait_for_ready());
Ok(num) Ok(num)
} }
@ -443,9 +449,11 @@ impl<'conn> Statement<'conn> {
}; };
Ok(CopyOutReader { Ok(CopyOutReader {
info: CopyInfo {
conn: conn, conn: conn,
format: Format::from_u16(format as u16), format: Format::from_u16(format as u16),
column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(), column_formats: column_formats.iter().map(|&f| Format::from_u16(f)).collect(),
},
buf: Cursor::new(vec![]), buf: Cursor::new(vec![]),
finished: false, finished: false,
}) })
@ -463,7 +471,7 @@ impl<'conn> Statement<'conn> {
} }
} }
fn fill_copy_buf<R: ReadWithInfo>(buf: &mut [u8], r: &mut R, info: &SessionInfo) fn fill_copy_buf<R: ReadWithInfo>(buf: &mut [u8], r: &mut R, info: &CopyInfo)
-> io::Result<usize> { -> io::Result<usize> {
let mut nread = 0; let mut nread = 0;
while nread < buf.len() { while nread < buf.len() {
@ -493,6 +501,44 @@ impl ColumnNew for Column {
} }
} }
/// A struct containing information relevant for a `COPY` operation.
pub struct CopyInfo<'a> {
conn: RefMut<'a, InnerConnection>,
format: Format,
column_formats: Vec<Format>,
}
impl<'a> CopyInfo<'a> {
/// Returns the format of the overall data.
pub fn format(&self) -> Format {
self.format
}
/// Returns the format of the individual columns.
pub fn column_formats(&self) -> &[Format] {
&self.column_formats
}
/// Returns session info for the associated connection.
pub fn session_info<'b>(&'b self) -> SessionInfo<'b> {
SessionInfo::new(&*self.conn)
}
}
/// Like `Read` except that a `CopyInfo` object is provided as well.
///
/// All types that implement `Read` also implement this trait.
pub trait ReadWithInfo {
/// Like `Read::read`.
fn read_with_info(&mut self, buf: &mut [u8], info: &CopyInfo) -> io::Result<usize>;
}
impl<R: Read> ReadWithInfo for R {
fn read_with_info(&mut self, buf: &mut [u8], _: &CopyInfo) -> io::Result<usize> {
self.read(buf)
}
}
impl Column { impl Column {
/// The name of the column. /// The name of the column.
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
@ -530,9 +576,7 @@ impl Format {
/// The underlying connection may not be used while a `CopyOutReader` exists. /// The underlying connection may not be used while a `CopyOutReader` exists.
/// Any attempt to do so will panic. /// Any attempt to do so will panic.
pub struct CopyOutReader<'a> { pub struct CopyOutReader<'a> {
conn: RefMut<'a, InnerConnection>, info: CopyInfo<'a>,
format: Format,
column_formats: Vec<Format>,
buf: Cursor<Vec<u8>>, buf: Cursor<Vec<u8>>,
finished: bool, finished: bool,
} }
@ -544,19 +588,9 @@ impl<'a> Drop for CopyOutReader<'a> {
} }
impl<'a> CopyOutReader<'a> { impl<'a> CopyOutReader<'a> {
/// Returns the format of the overall data. /// Returns the `CopyInfo` for the current operation.
pub fn format(&self) -> Format { pub fn info(&self) -> &CopyInfo {
self.format &self.info
}
/// Returns the format of the individual columns.
pub fn column_formats(&self) -> &[Format] {
&self.column_formats
}
/// Returns session info for the associated connection.
pub fn session_info<'b>(&'b self) -> SessionInfo<'b> {
SessionInfo::new(&*self.conn)
} }
/// Consumes the `CopyOutReader`, throwing away any unread data. /// Consumes the `CopyOutReader`, throwing away any unread data.
@ -581,26 +615,26 @@ impl<'a> CopyOutReader<'a> {
return Ok(()); return Ok(());
} }
match try!(self.conn.read_message()) { match try!(self.info.conn.read_message()) {
BCopyData { data } => self.buf = Cursor::new(data), BCopyData { data } => self.buf = Cursor::new(data),
BCopyDone => { BCopyDone => {
self.finished = true; self.finished = true;
match try!(self.conn.read_message()) { match try!(self.info.conn.read_message()) {
CommandComplete { .. } => {} CommandComplete { .. } => {}
_ => { _ => {
self.conn.desynchronized = true; self.info.conn.desynchronized = true;
return Err(Error::IoError(bad_response())); return Err(Error::IoError(bad_response()));
} }
} }
try!(self.conn.wait_for_ready()); try!(self.info.conn.wait_for_ready());
} }
ErrorResponse { fields } => { ErrorResponse { fields } => {
self.finished = true; self.finished = true;
try!(self.conn.wait_for_ready()); try!(self.info.conn.wait_for_ready());
return DbError::new(fields); return DbError::new(fields);
} }
_ => { _ => {
self.conn.desynchronized = true; self.info.conn.desynchronized = true;
return Err(Error::IoError(bad_response())); return Err(Error::IoError(bad_response()));
} }
} }

View File

@ -76,20 +76,6 @@ impl<'a> SessionInfo<'a> {
} }
} }
/// Like `Read` except that a `SessionInfo` object is provided as well.
///
/// All types that implement `Read` also implement this trait.
pub trait ReadWithInfo {
/// Like `Read::read`.
fn read_with_info(&mut self, buf: &mut [u8], info: &SessionInfo) -> io::Result<usize>;
}
impl<R: Read> ReadWithInfo for R {
fn read_with_info(&mut self, buf: &mut [u8], _: &SessionInfo) -> io::Result<usize> {
self.read(buf)
}
}
/// A Postgres OID. /// A Postgres OID.
pub type Oid = u32; pub type Oid = u32;