Allocate one vec per row, not cell

This commit is contained in:
Steven Fackler 2016-12-18 19:41:23 -08:00
parent ad69145b56
commit 310a4888e9
3 changed files with 93 additions and 46 deletions

View File

@ -85,6 +85,7 @@ use std::collections::{VecDeque, HashMap};
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::mem; use std::mem;
use std::ops::Range;
use std::result; use std::result;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -488,7 +489,9 @@ impl InnerConnection {
Ok((param_types, columns)) Ok((param_types, columns))
} }
fn read_rows(&mut self, buf: &mut VecDeque<Vec<Option<Vec<u8>>>>) -> Result<bool> { fn read_rows<F>(&mut self, mut consumer: F) -> Result<bool>
where F: FnMut(RowData)
{
let more_rows; let more_rows;
loop { loop {
match try!(self.read_message()) { match try!(self.read_message()) {
@ -502,8 +505,12 @@ impl InnerConnection {
break; break;
} }
backend::Message::DataRow(body) => { backend::Message::DataRow(body) => {
let row = try!(body.values().map(|v| v.map(ToOwned::to_owned)).collect()); let mut row = RowData::new();
buf.push_back(row); let mut it = body.values();
while let Some(value) = try!(it.next()) {
row.push(value);
}
consumer(row);
} }
backend::Message::ErrorResponse(body) => { backend::Message::ErrorResponse(body) => {
try!(self.wait_for_ready()); try!(self.wait_for_ready());
@ -694,12 +701,11 @@ impl InnerConnection {
fn read_type(&mut self, oid: Oid) -> Result<Other> { fn read_type(&mut self, oid: Oid) -> Result<Other> {
try!(self.setup_typeinfo_query()); try!(self.setup_typeinfo_query());
try!(self.raw_execute(TYPEINFO_QUERY, "", 0, &[Type::Oid], &[&oid])); try!(self.raw_execute(TYPEINFO_QUERY, "", 0, &[Type::Oid], &[&oid]));
let mut rows = VecDeque::new(); let mut row = None;
try!(self.read_rows(&mut rows)); try!(self.read_rows(|r| row = Some(r)));
let row = rows.pop_front();
let get_raw = |i: usize| { let get_raw = |i: usize| {
row.as_ref().and_then(|r| r.get(i)).and_then(|r| r.as_ref().map(|r| &**r)) row.as_ref().and_then(|r| r.get(i))
}; };
let (name, type_, elem_oid, rngsubtype, basetype, schema, relid) = { let (name, type_, elem_oid, rngsubtype, basetype, schema, relid) = {
@ -770,14 +776,13 @@ impl InnerConnection {
fn read_enum_variants(&mut self, oid: Oid) -> Result<Vec<String>> { fn read_enum_variants(&mut self, oid: Oid) -> Result<Vec<String>> {
try!(self.setup_typeinfo_enum_query()); try!(self.setup_typeinfo_enum_query());
try!(self.raw_execute(TYPEINFO_ENUM_QUERY, "", 0, &[Type::Oid], &[&oid])); try!(self.raw_execute(TYPEINFO_ENUM_QUERY, "", 0, &[Type::Oid], &[&oid]));
let mut rows = VecDeque::new(); let mut rows = vec![];
try!(self.read_rows(&mut rows)); try!(self.read_rows(|row| rows.push(row)));
let ctx = SessionInfo::new(&self.parameters); let ctx = SessionInfo::new(&self.parameters);
let mut variants = vec![]; let mut variants = vec![];
for row in rows { for row in rows {
let raw = row.get(0).and_then(|r| r.as_ref().map(|r| &**r)); variants.push(try!(String::from_sql_nullable(&Type::Name, row.get(0), &ctx)
variants.push(try!(String::from_sql_nullable(&Type::Name, raw, &ctx)
.map_err(Error::Conversion))); .map_err(Error::Conversion)));
} }
@ -804,17 +809,16 @@ impl InnerConnection {
fn read_composite_fields(&mut self, relid: Oid) -> Result<Vec<Field>> { fn read_composite_fields(&mut self, relid: Oid) -> Result<Vec<Field>> {
try!(self.setup_typeinfo_composite_query()); try!(self.setup_typeinfo_composite_query());
try!(self.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", 0, &[Type::Oid], &[&relid])); try!(self.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", 0, &[Type::Oid], &[&relid]));
let mut rows = VecDeque::new(); let mut rows = vec![];
try!(self.read_rows(&mut rows)); try!(self.read_rows(|row| rows.push(row)));
let mut fields = vec![]; let mut fields = vec![];
for row in rows { for row in rows {
let (name, type_) = { let (name, type_) = {
let get_raw = |i: usize| row.get(i).and_then(|r| r.as_ref().map(|r| &**r));
let ctx = SessionInfo::new(&self.parameters); let ctx = SessionInfo::new(&self.parameters);
let name = try!(String::from_sql_nullable(&Type::Name, get_raw(0), &ctx) let name = try!(String::from_sql_nullable(&Type::Name, row.get(0), &ctx)
.map_err(Error::Conversion)); .map_err(Error::Conversion));
let type_ = try!(Oid::from_sql_nullable(&Type::Oid, get_raw(1), &ctx) let type_ = try!(Oid::from_sql_nullable(&Type::Oid, row.get(1), &ctx)
.map_err(Error::Conversion)); .map_err(Error::Conversion));
(name, type_) (name, type_)
}; };
@ -1329,6 +1333,43 @@ impl<'a> GenericConnection for Transaction<'a> {
} }
} }
struct RowData {
buf: Vec<u8>,
indices: Vec<Option<Range<usize>>>,
}
impl RowData {
fn new() -> RowData {
RowData {
buf: vec![],
indices: vec![],
}
}
fn push(&mut self, cell: Option<&[u8]>) {
let index = match cell {
Some(cell) => {
let base = self.buf.len();
self.buf.extend_from_slice(cell);
Some(base..self.buf.len())
}
None => None,
};
self.indices.push(index);
}
fn len(&self) -> usize {
self.indices.len()
}
fn get(&self, index: usize) -> Option<&[u8]> {
match &self.indices[index] {
&Some(ref range) => Some(&self.buf[range.clone()]),
&None => None,
}
}
}
trait OtherNew { trait OtherNew {
fn new(name: String, oid: Oid, kind: Kind, schema: String) -> Other; fn new(name: String, oid: Oid, kind: Kind, schema: String) -> Other;
} }
@ -1340,13 +1381,13 @@ trait DbErrorNew {
} }
trait RowsNew<'a> { trait RowsNew<'a> {
fn new(stmt: &'a Statement<'a>, data: Vec<Vec<Option<Vec<u8>>>>) -> Rows<'a>; fn new(stmt: &'a Statement<'a>, data: Vec<RowData>) -> Rows<'a>;
fn new_owned(stmt: Statement<'a>, data: Vec<Vec<Option<Vec<u8>>>>) -> Rows<'a>; fn new_owned(stmt: Statement<'a>, data: Vec<RowData>) -> Rows<'a>;
} }
trait LazyRowsNew<'trans, 'stmt> { trait LazyRowsNew<'trans, 'stmt> {
fn new(stmt: &'stmt Statement<'stmt>, fn new(stmt: &'stmt Statement<'stmt>,
data: VecDeque<Vec<Option<Vec<u8>>>>, data: VecDeque<RowData>,
name: String, name: String,
row_limit: i32, row_limit: i32,
more_rows: bool, more_rows: bool,

View File

@ -3,14 +3,13 @@
use fallible_iterator::FallibleIterator; use fallible_iterator::FallibleIterator;
use postgres_protocol::message::frontend; use postgres_protocol::message::frontend;
use std::ascii::AsciiExt; use std::ascii::AsciiExt;
use std::borrow::Cow;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::ops::Deref; use std::ops::Deref;
use std::slice; use std::slice;
use {Result, SessionInfoNew, RowsNew, LazyRowsNew, StatementInternals, WrongTypeNew}; use {Result, SessionInfoNew, RowsNew, LazyRowsNew, StatementInternals, WrongTypeNew, RowData};
use transaction::Transaction; use transaction::Transaction;
use types::{FromSql, SessionInfo, WrongType}; use types::{FromSql, SessionInfo, WrongType};
use stmt::{Statement, Column}; use stmt::{Statement, Column};
@ -35,18 +34,18 @@ impl<'a, T> Deref for MaybeOwned<'a, T> {
/// The resulting rows of a query. /// The resulting rows of a query.
pub struct Rows<'stmt> { pub struct Rows<'stmt> {
stmt: MaybeOwned<'stmt, Statement<'stmt>>, stmt: MaybeOwned<'stmt, Statement<'stmt>>,
data: Vec<Vec<Option<Vec<u8>>>>, data: Vec<RowData>,
} }
impl<'a> RowsNew<'a> for Rows<'a> { impl<'a> RowsNew<'a> for Rows<'a> {
fn new(stmt: &'a Statement<'a>, data: Vec<Vec<Option<Vec<u8>>>>) -> Rows<'a> { fn new(stmt: &'a Statement<'a>, data: Vec<RowData>) -> Rows<'a> {
Rows { Rows {
stmt: MaybeOwned::Borrowed(stmt), stmt: MaybeOwned::Borrowed(stmt),
data: data, data: data,
} }
} }
fn new_owned(stmt: Statement<'a>, data: Vec<Vec<Option<Vec<u8>>>>) -> Rows<'a> { fn new_owned(stmt: Statement<'a>, data: Vec<RowData>) -> Rows<'a> {
Rows { Rows {
stmt: MaybeOwned::Owned(stmt), stmt: MaybeOwned::Owned(stmt),
data: data, data: data,
@ -112,7 +111,7 @@ impl<'a> IntoIterator for &'a Rows<'a> {
/// An iterator over `Row`s. /// An iterator over `Row`s.
pub struct Iter<'a> { pub struct Iter<'a> {
stmt: &'a Statement<'a>, stmt: &'a Statement<'a>,
iter: slice::Iter<'a, Vec<Option<Vec<u8>>>>, iter: slice::Iter<'a, RowData>,
} }
impl<'a> Iterator for Iter<'a> { impl<'a> Iterator for Iter<'a> {
@ -148,7 +147,7 @@ impl<'a> ExactSizeIterator for Iter<'a> {}
/// A single result row of a query. /// A single result row of a query.
pub struct Row<'a> { pub struct Row<'a> {
stmt: &'a Statement<'a>, stmt: &'a Statement<'a>,
data: MaybeOwned<'a, Vec<Option<Vec<u8>>>>, data: MaybeOwned<'a, RowData>,
} }
impl<'a> fmt::Debug for Row<'a> { impl<'a> fmt::Debug for Row<'a> {
@ -238,7 +237,7 @@ impl<'a> Row<'a> {
} }
let conn = self.stmt.conn().0.borrow(); let conn = self.stmt.conn().0.borrow();
let value = FromSql::from_sql_nullable(ty, let value = FromSql::from_sql_nullable(ty,
self.data[idx].as_ref().map(|r| &**r), self.data.get(idx),
&SessionInfo::new(&conn.parameters)); &SessionInfo::new(&conn.parameters));
Some(value.map_err(Error::Conversion)) Some(value.map_err(Error::Conversion))
} }
@ -252,7 +251,7 @@ impl<'a> Row<'a> {
where I: RowIndex + fmt::Debug where I: RowIndex + fmt::Debug
{ {
match idx.idx(self.stmt) { match idx.idx(self.stmt) {
Some(idx) => self.data[idx].as_ref().map(|e| &**e), Some(idx) => self.data.get(idx),
None => panic!("invalid index {:?}", idx), None => panic!("invalid index {:?}", idx),
} }
} }
@ -293,7 +292,7 @@ impl<'a> RowIndex for &'a str {
/// A lazily-loaded iterator over the resulting rows of a query. /// A lazily-loaded iterator over the resulting rows of a query.
pub struct LazyRows<'trans, 'stmt> { pub struct LazyRows<'trans, 'stmt> {
stmt: &'stmt Statement<'stmt>, stmt: &'stmt Statement<'stmt>,
data: VecDeque<Vec<Option<Vec<u8>>>>, data: VecDeque<RowData>,
name: String, name: String,
row_limit: i32, row_limit: i32,
more_rows: bool, more_rows: bool,
@ -303,7 +302,7 @@ pub struct LazyRows<'trans, 'stmt> {
impl<'trans, 'stmt> LazyRowsNew<'trans, 'stmt> for LazyRows<'trans, 'stmt> { impl<'trans, 'stmt> LazyRowsNew<'trans, 'stmt> for LazyRows<'trans, 'stmt> {
fn new(stmt: &'stmt Statement<'stmt>, fn new(stmt: &'stmt Statement<'stmt>,
data: VecDeque<Vec<Option<Vec<u8>>>>, data: VecDeque<RowData>,
name: String, name: String,
row_limit: i32, row_limit: i32,
more_rows: bool, more_rows: bool,
@ -354,7 +353,7 @@ impl<'trans, 'stmt> LazyRows<'trans, 'stmt> {
try!(conn.stream.write_message(|buf| frontend::execute(&self.name, self.row_limit, buf))); try!(conn.stream.write_message(|buf| frontend::execute(&self.name, self.row_limit, buf)));
try!(conn.stream.write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf)))); try!(conn.stream.write_message(|buf| Ok::<(), io::Error>(frontend::sync(buf))));
try!(conn.stream.flush()); try!(conn.stream.flush());
conn.read_rows(&mut self.data).map(|more_rows| self.more_rows = more_rows) conn.read_rows(|row| self.data.push_back(row)).map(|more_rows| self.more_rows = more_rows)
} }
/// Returns a slice describing the columns of the `LazyRows`. /// Returns a slice describing the columns of the `LazyRows`.

View File

@ -12,7 +12,7 @@ use error::{Error, DbError};
use types::{SessionInfo, Type, ToSql}; use types::{SessionInfo, Type, ToSql};
use rows::{Rows, LazyRows}; use rows::{Rows, LazyRows};
use transaction::Transaction; use transaction::Transaction;
use {bad_response, Connection, StatementInternals, Result, RowsNew, InnerConnection, use {bad_response, Connection, StatementInternals, Result, RowsNew, InnerConnection, RowData,
SessionInfoNew, LazyRowsNew, DbErrorNew, ColumnNew, StatementInfo, TransactionInternals}; SessionInfoNew, LazyRowsNew, DbErrorNew, ColumnNew, StatementInfo, TransactionInternals};
/// A prepared statement. /// A prepared statement.
@ -59,8 +59,9 @@ impl<'conn> StatementInternals<'conn> for Statement<'conn> {
fn into_query(self, params: &[&ToSql]) -> Result<Rows<'conn>> { fn into_query(self, params: &[&ToSql]) -> Result<Rows<'conn>> {
check_desync!(self.conn); check_desync!(self.conn);
self.inner_query("", 0, params) let mut rows = vec![];
.map(|(buf, _)| Rows::new_owned(self, buf.into_iter().collect())) try!(self.inner_query("", 0, params, |row| rows.push(row)));
Ok(Rows::new_owned(self, rows))
} }
} }
@ -77,11 +78,14 @@ impl<'conn> Statement<'conn> {
} }
#[allow(type_complexity)] #[allow(type_complexity)]
fn inner_query<'a>(&'a self, fn inner_query<F>(&self,
portal_name: &str, portal_name: &str,
row_limit: i32, row_limit: i32,
params: &[&ToSql]) params: &[&ToSql],
-> Result<(VecDeque<Vec<Option<Vec<u8>>>>, bool)> { acceptor: F)
-> Result<bool>
where F: FnMut(RowData)
{
let mut conn = self.conn.0.borrow_mut(); let mut conn = self.conn.0.borrow_mut();
try!(conn.raw_execute(&self.info.name, try!(conn.raw_execute(&self.info.name,
@ -90,9 +94,7 @@ impl<'conn> Statement<'conn> {
self.param_types(), self.param_types(),
params)); params));
let mut buf = VecDeque::new(); conn.read_rows(acceptor)
let more_rows = try!(conn.read_rows(&mut buf));
Ok((buf, more_rows))
} }
/// Returns a slice containing the expected parameter types. /// Returns a slice containing the expected parameter types.
@ -200,7 +202,9 @@ impl<'conn> Statement<'conn> {
/// ``` /// ```
pub fn query<'a>(&'a self, params: &[&ToSql]) -> Result<Rows<'a>> { pub fn query<'a>(&'a self, params: &[&ToSql]) -> Result<Rows<'a>> {
check_desync!(self.conn); check_desync!(self.conn);
self.inner_query("", 0, params).map(|(buf, _)| Rows::new(self, buf.into_iter().collect())) let mut rows = vec![];
try!(self.inner_query("", 0, params, |row| rows.push(row)));
Ok(Rows::new(self, rows))
} }
/// Executes the prepared statement, returning a lazily loaded iterator /// Executes the prepared statement, returning a lazily loaded iterator
@ -239,9 +243,12 @@ impl<'conn> Statement<'conn> {
self.next_portal_id.set(id + 1); self.next_portal_id.set(id + 1);
let portal_name = format!("{}p{}", self.info.name, id); let portal_name = format!("{}p{}", self.info.name, id);
self.inner_query(&portal_name, row_limit, params).map(move |(data, more_rows)| { let mut rows = VecDeque::new();
LazyRows::new(self, data, portal_name, row_limit, more_rows, false, trans) let more_rows = try!(self.inner_query(&portal_name,
}) row_limit,
params,
|row| rows.push_back(row)));
Ok(LazyRows::new(self, rows, portal_name, row_limit, more_rows, false, trans))
} }
/// Executes a `COPY FROM STDIN` statement, returning the number of rows /// Executes a `COPY FROM STDIN` statement, returning the number of rows