From 8340d4b5ab2b686fea68f450c7eb109eb335acd1 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 21 Dec 2016 12:45:54 -0800 Subject: [PATCH] Statement peparation and execution --- postgres-tokio/src/lib.rs | 44 +++++++++++++++++++++++++++++++++----- postgres-tokio/src/test.rs | 17 +++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/postgres-tokio/src/lib.rs b/postgres-tokio/src/lib.rs index dd6441c7..4b0ac728 100644 --- a/postgres-tokio/src/lib.rs +++ b/postgres-tokio/src/lib.rs @@ -42,6 +42,7 @@ struct InnerConnection { stream: PostgresStream, parameters: HashMap, cancel_data: CancelData, + next_stmt_id: u32, } impl InnerConnection { @@ -127,7 +128,8 @@ impl Connection { cancel_data: CancelData { process_id: 0, secret_key: 0, - } + }, + next_stmt_id: 0, }) }) .and_then(|s| s.startup(params)) @@ -402,8 +404,6 @@ impl Connection { |f, t| Column { name: f.0, type_: t }) .map(|(r, s)| (p, r, s)) }) - .and_then(|(p, r, s)| s.ready((p, r))) - .map(|((p, r), s)| (p, r, s)) .boxed() } @@ -454,8 +454,8 @@ impl Connection { let mut bind = vec![]; let mut execute = vec![]; let mut sync = vec![]; - let r = frontend::bind(stmt, - portal, + let r = frontend::bind(portal, + stmt, Some(1), params.iter().zip(param_types), |(param, ty), buf| { @@ -533,6 +533,22 @@ impl Connection { .boxed() } + pub fn prepare(mut self, query: &str) -> BoxFuture<(Statement, Connection), Error> { + let id = self.0.next_stmt_id; + self.0.next_stmt_id += 1; + let name = format!("s{}", id); + self.raw_prepare(&name, query) + .map(|(params, columns, conn)| { + let stmt = Statement { + name: name, + params: params, + columns: columns, + }; + (stmt, conn) + }) + .boxed() + } + pub fn cancel_data(&self) -> CancelData { self.0.cancel_data } @@ -543,6 +559,24 @@ struct Column { type_: Type, } +pub struct Statement { + name: String, + params: Vec, + columns: Vec, +} + +impl Statement { + pub fn execute(self, + params: &[&ToSql], + conn: Connection) + -> BoxFuture<(u64, Statement, Connection), Error> { + conn.raw_execute(&self.name, "", &self.params, params) + .and_then(|conn| conn.finish_execute()) + .map(|(n, conn)| (n, self, conn)) + .boxed() + } +} + fn connect_err(fields: &mut ErrorFields) -> ConnectError { match DbError::new(fields) { Ok(err) => ConnectError::Db(Box::new(err)), diff --git a/postgres-tokio/src/test.rs b/postgres-tokio/src/test.rs index ef01c94d..4b1f2b77 100644 --- a/postgres-tokio/src/test.rs +++ b/postgres-tokio/src/test.rs @@ -104,3 +104,20 @@ fn batch_execute_err() { }); l.run(done).unwrap(); } + +#[test] +fn prepare_execute() { + let mut l = Core::new().unwrap(); + let done = Connection::connect("postgres://postgres@localhost", &l.handle()) + .then(|c| { + c.unwrap().prepare("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY, name VARCHAR)") + }) + .and_then(|(s, c)| s.execute(&[], c)) + .and_then(|(n, _, c)| { + assert_eq!(0, n); + c.prepare("INSERT INTO foo (name) VALUES ($1), ($2)") + }) + .and_then(|(s, c)| s.execute(&[&"steven", &"bob"], c)) + .map(|(n, _, _)| assert_eq!(n, 2)); + l.run(done).unwrap(); +}