Split out stmt

This commit is contained in:
Steven Fackler 2016-12-26 15:57:43 -05:00
parent a03e770141
commit ba1d9d1f01
3 changed files with 79 additions and 35 deletions

View File

@ -23,7 +23,6 @@ use postgres_shared::RowData;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::mem;
use std::sync::Arc;
use std::sync::mpsc::{self, Sender, Receiver};
use tokio_core::reactor::Handle;
@ -33,6 +32,7 @@ pub use postgres_shared::{params, Column, RowIndex};
use error::{ConnectError, Error, DbError, SqlState};
use params::{ConnectParams, IntoConnectParams};
use stmt::Statement;
use stream::PostgresStream;
use tls::Handshake;
use transaction::Transaction;
@ -41,6 +41,7 @@ use rows::Row;
pub mod error;
pub mod rows;
pub mod stmt;
mod stream;
pub mod tls;
pub mod transaction;
@ -878,19 +879,17 @@ impl Connection {
let name = format!("s{}", id);
self.raw_prepare(&name, query)
.map(|(params, columns, conn)| {
let stmt = Statement {
close_sender: conn.0.close_sender.clone(),
name: name,
params: params,
columns: Arc::new(columns),
};
let stmt = Statement::new(conn.0.close_sender.clone(),
name,
params,
Arc::new(columns));
(stmt, conn)
})
.boxed()
}
pub fn execute(self, statement: &Statement, params: &[&ToSql]) -> BoxFuture<(u64, Connection), Error> {
self.raw_execute(&statement.name, "", &statement.params, params)
self.raw_execute(statement.name(), "", statement.parameters(), params)
.and_then(|conn| conn.finish_execute())
.boxed()
}
@ -899,8 +898,8 @@ impl Connection {
statement: &Statement,
params: &[&ToSql])
-> BoxStateStream<Row, Connection, Error> {
let columns = statement.columns.clone();
self.raw_execute(&statement.name, "", &statement.params, params)
let columns = statement.columns_arc().clone();
self.raw_execute(statement.name(), "", statement.parameters(), params)
.map(|c| c.read_rows().map(move |r| Row::new(columns.clone(), r)))
.flatten_state_stream()
.boxed()
@ -926,30 +925,6 @@ impl Connection {
}
}
pub struct Statement {
close_sender: Sender<(u8, String)>,
name: String,
params: Vec<Type>,
columns: Arc<Vec<Column>>,
}
impl Drop for Statement {
fn drop(&mut self) {
let name = mem::replace(&mut self.name, String::new());
let _ = self.close_sender.send((b'S', name));
}
}
impl Statement {
pub fn parameters(&self) -> &[Type] {
&self.params
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
}
fn connect_err(fields: &mut ErrorFields) -> ConnectError {
match DbError::new(fields) {
Ok(err) => ConnectError::Db(Box::new(err)),
@ -967,6 +942,18 @@ trait RowNew {
fn new(columns: Arc<Vec<Column>>, data: RowData) -> Row;
}
trait StatementNew {
fn new(close_sender: Sender<(u8, String)>,
name: String,
params: Vec<Type>,
columns: Arc<Vec<Column>>)
-> Statement;
fn columns_arc(&self) -> &Arc<Vec<Column>>;
fn name(&self) -> &str;
}
trait TransactionNew {
fn new(c: Connection) -> Transaction;
}

View File

@ -0,0 +1,56 @@
use std::mem;
use std::sync::Arc;
use std::sync::mpsc::Sender;
#[doc(inline)]
pub use postgres_shared::Column;
use StatementNew;
use types::Type;
pub struct Statement {
close_sender: Sender<(u8, String)>,
name: String,
params: Vec<Type>,
columns: Arc<Vec<Column>>,
}
impl StatementNew for Statement {
fn new(close_sender: Sender<(u8, String)>,
name: String,
params: Vec<Type>,
columns: Arc<Vec<Column>>)
-> Statement {
Statement {
close_sender: close_sender,
name: name,
params: params,
columns: columns,
}
}
fn columns_arc(&self) -> &Arc<Vec<Column>> {
&self.columns
}
fn name(&self) -> &str {
&self.name
}
}
impl Drop for Statement {
fn drop(&mut self) {
let name = mem::replace(&mut self.name, String::new());
let _ = self.close_sender.send((b'S', name));
}
}
impl Statement {
pub fn parameters(&self) -> &[Type] {
&self.params
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
}

View File

@ -1,8 +1,9 @@
use futures::{Future, BoxFuture};
use futures_state_stream::{StateStream, BoxStateStream};
use {Connection, Statement, TransactionNew};
use {Connection, TransactionNew};
use error::Error;
use stmt::Statement;
use types::ToSql;
use rows::Row;