Make read_rows a method on InnerConnection

This commit is contained in:
Steven Fackler 2016-02-15 22:16:26 -08:00
parent 47b0c69d35
commit 259fe11a1c
3 changed files with 48 additions and 48 deletions

View File

@ -665,6 +665,50 @@ impl InnerConnection {
Ok((param_types, columns)) Ok((param_types, columns))
} }
fn read_rows(&mut self, buf: &mut VecDeque<Vec<Option<Vec<u8>>>>) -> Result<bool> {
let more_rows;
loop {
match try!(self.read_message()) {
EmptyQueryResponse | CommandComplete { .. } => {
more_rows = false;
break;
}
PortalSuspended => {
more_rows = true;
break;
}
DataRow { row } => buf.push_back(row),
ErrorResponse { fields } => {
try!(self.wait_for_ready());
return DbError::new(fields);
}
CopyInResponse { .. } => {
try!(self.write_messages(&[CopyFail {
message: "COPY queries cannot be directly executed",
},
Sync]));
}
CopyOutResponse { .. } => {
loop {
match try!(self.read_message()) {
ReadyForQuery { .. } => break,
_ => {}
}
}
return Err(Error::Io(std_io::Error::new(std_io::ErrorKind::InvalidInput,
"COPY queries cannot be directly \
executed")));
}
_ => {
self.desynchronized = true;
return Err(Error::Io(bad_response()));
}
}
}
try!(self.wait_for_ready());
Ok(more_rows)
}
fn make_stmt_name(&mut self) -> String { fn make_stmt_name(&mut self) -> String {
let stmt_name = format!("s{}", self.next_stmt_id); let stmt_name = format!("s{}", self.next_stmt_id);
self.next_stmt_id += 1; self.next_stmt_id += 1;
@ -1358,50 +1402,6 @@ impl<'conn> Transaction<'conn> {
} }
} }
fn read_rows(conn: &mut InnerConnection, buf: &mut VecDeque<Vec<Option<Vec<u8>>>>) -> Result<bool> {
let more_rows;
loop {
match try!(conn.read_message()) {
EmptyQueryResponse | CommandComplete { .. } => {
more_rows = false;
break;
}
PortalSuspended => {
more_rows = true;
break;
}
DataRow { row } => buf.push_back(row),
ErrorResponse { fields } => {
try!(conn.wait_for_ready());
return DbError::new(fields);
}
CopyInResponse { .. } => {
try!(conn.write_messages(&[CopyFail {
message: "COPY queries cannot be directly executed",
},
Sync]));
}
CopyOutResponse { .. } => {
loop {
match try!(conn.read_message()) {
ReadyForQuery { .. } => break,
_ => {}
}
}
return Err(Error::Io(std_io::Error::new(std_io::ErrorKind::InvalidInput,
"COPY queries cannot be directly \
executed")));
}
_ => {
conn.desynchronized = true;
return Err(Error::Io(bad_response()));
}
}
}
try!(conn.wait_for_ready());
Ok(more_rows)
}
/// A trait allowing abstraction over connections and transactions /// A trait allowing abstraction over connections and transactions
pub trait GenericConnection { pub trait GenericConnection {
/// Like `Connection::execute`. /// Like `Connection::execute`.

View File

@ -7,7 +7,7 @@ use std::fmt;
use std::ops::Deref; use std::ops::Deref;
use std::slice; use std::slice;
use {Result, Transaction, read_rows, DbErrorNew, SessionInfoNew, RowsNew, LazyRowsNew, use {Result, Transaction, DbErrorNew, SessionInfoNew, RowsNew, LazyRowsNew,
StatementInternals, WrongTypeNew}; StatementInternals, WrongTypeNew};
use types::{FromSql, SessionInfo, WrongType}; use types::{FromSql, SessionInfo, WrongType};
use stmt::{Statement, Column}; use stmt::{Statement, Column};
@ -345,7 +345,7 @@ impl<'trans, 'stmt> LazyRows<'trans, 'stmt> {
max_rows: self.row_limit, max_rows: self.row_limit,
}, },
Sync])); Sync]));
read_rows(&mut conn, &mut self.data).map(|more_rows| self.more_rows = more_rows) conn.read_rows(&mut self.data).map(|more_rows| self.more_rows = more_rows)
} }
/// Returns a slice describing the columns of the `LazyRows`. /// Returns a slice describing the columns of the `LazyRows`.

View File

@ -13,7 +13,7 @@ use message::BackendMessage::*;
use message::WriteMessage; use message::WriteMessage;
use util; use util;
use rows::{Rows, LazyRows}; use rows::{Rows, LazyRows};
use {read_rows, bad_response, Connection, Transaction, StatementInternals, Result, RowsNew}; use {bad_response, Connection, Transaction, StatementInternals, Result, RowsNew};
use {InnerConnection, SessionInfoNew, LazyRowsNew, DbErrorNew, ColumnNew, StatementInfo}; use {InnerConnection, SessionInfoNew, LazyRowsNew, DbErrorNew, ColumnNew, StatementInfo};
/// A prepared statement. /// A prepared statement.
@ -129,7 +129,7 @@ impl<'conn> Statement<'conn> {
try!(self.inner_execute(portal_name, row_limit, params)); try!(self.inner_execute(portal_name, row_limit, params));
let mut buf = VecDeque::new(); let mut buf = VecDeque::new();
let more_rows = try!(read_rows(&mut self.conn.conn.borrow_mut(), &mut buf)); let more_rows = try!(self.conn.conn.borrow_mut().read_rows(&mut buf));
Ok((buf, more_rows)) Ok((buf, more_rows))
} }