remove Frontend messages
This commit is contained in:
parent
ad6fa4d0b1
commit
f135d22394
15
src/lib.rs
15
src/lib.rs
@ -65,8 +65,7 @@ use postgres_protocol::message::frontend;
|
|||||||
|
|
||||||
use error::{Error, ConnectError, SqlState, DbError};
|
use error::{Error, ConnectError, SqlState, DbError};
|
||||||
use io::{TlsStream, TlsHandshake};
|
use io::{TlsStream, TlsHandshake};
|
||||||
use message::{Frontend, Backend, RowDescriptionEntry};
|
use message::{Backend, RowDescriptionEntry, ReadMessage};
|
||||||
use message::{WriteMessage, ReadMessage};
|
|
||||||
use notification::{Notifications, Notification};
|
use notification::{Notifications, Notification};
|
||||||
use params::{ConnectParams, IntoConnectParams, UserInfo};
|
use params::{ConnectParams, IntoConnectParams, UserInfo};
|
||||||
use rows::{Rows, LazyRows};
|
use rows::{Rows, LazyRows};
|
||||||
@ -306,17 +305,11 @@ impl InnerConnection {
|
|||||||
fn write_message<M>(&mut self, message: &M) -> std_io::Result<()>
|
fn write_message<M>(&mut self, message: &M) -> std_io::Result<()>
|
||||||
where M: frontend::Message
|
where M: frontend::Message
|
||||||
{
|
{
|
||||||
|
debug_assert!(!self.desynchronized);
|
||||||
self.io_buf.clear();
|
self.io_buf.clear();
|
||||||
try!(message.write(&mut self.io_buf));
|
try!(message.write(&mut self.io_buf));
|
||||||
self.stream.write_all(&self.io_buf)
|
try_desync!(self, self.stream.write_all(&self.io_buf));
|
||||||
}
|
Ok(())
|
||||||
|
|
||||||
fn write_messages(&mut self, messages: &[Frontend]) -> std_io::Result<()> {
|
|
||||||
debug_assert!(!self.desynchronized);
|
|
||||||
for message in messages {
|
|
||||||
try_desync!(self, self.stream.write_message(message));
|
|
||||||
}
|
|
||||||
Ok(try_desync!(self, self.stream.flush()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_message_with_notification(&mut self) -> std_io::Result<Backend> {
|
fn read_message_with_notification(&mut self) -> std_io::Result<Backend> {
|
||||||
|
@ -2,7 +2,7 @@ use std::io;
|
|||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, ReadBytesExt};
|
||||||
|
|
||||||
use types::Oid;
|
use types::Oid;
|
||||||
use priv_io::StreamOptions;
|
use priv_io::StreamOptions;
|
||||||
@ -81,77 +81,6 @@ pub struct RowDescriptionEntry {
|
|||||||
pub format: i16,
|
pub format: i16,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Frontend<'a> {
|
|
||||||
CopyData {
|
|
||||||
data: &'a [u8],
|
|
||||||
},
|
|
||||||
CopyDone,
|
|
||||||
CopyFail {
|
|
||||||
message: &'a str,
|
|
||||||
},
|
|
||||||
Execute {
|
|
||||||
portal: &'a str,
|
|
||||||
max_rows: i32,
|
|
||||||
},
|
|
||||||
Sync,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
trait WriteCStr {
|
|
||||||
fn write_cstr(&mut self, s: &str) -> io::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W: Write> WriteCStr for W {
|
|
||||||
fn write_cstr(&mut self, s: &str) -> io::Result<()> {
|
|
||||||
try!(self.write_all(s.as_bytes()));
|
|
||||||
Ok(try!(self.write_u8(0)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
pub trait WriteMessage {
|
|
||||||
fn write_message(&mut self, &Frontend) -> io::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W: Write> WriteMessage for W {
|
|
||||||
#[allow(cyclomatic_complexity)]
|
|
||||||
fn write_message(&mut self, message: &Frontend) -> io::Result<()> {
|
|
||||||
let mut buf = vec![];
|
|
||||||
let ident;
|
|
||||||
|
|
||||||
match *message {
|
|
||||||
Frontend::CopyData { data } => {
|
|
||||||
ident = Some(b'd');
|
|
||||||
try!(buf.write_all(data));
|
|
||||||
}
|
|
||||||
Frontend::CopyDone => ident = Some(b'c'),
|
|
||||||
Frontend::CopyFail { message } => {
|
|
||||||
ident = Some(b'f');
|
|
||||||
try!(buf.write_cstr(message));
|
|
||||||
}
|
|
||||||
Frontend::Execute { portal, max_rows } => {
|
|
||||||
ident = Some(b'E');
|
|
||||||
try!(buf.write_cstr(portal));
|
|
||||||
try!(buf.write_i32::<BigEndian>(max_rows));
|
|
||||||
}
|
|
||||||
Frontend::Sync => ident = Some(b'S'),
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ident) = ident {
|
|
||||||
try!(self.write_u8(ident));
|
|
||||||
}
|
|
||||||
|
|
||||||
// add size of length value
|
|
||||||
if buf.len() > u32::max_value() as usize - mem::size_of::<u32>() {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "value too large to transmit"));
|
|
||||||
}
|
|
||||||
try!(self.write_u32::<BigEndian>((buf.len() + mem::size_of::<u32>()) as u32));
|
|
||||||
try!(self.write_all(&*buf));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
trait ReadCStr {
|
trait ReadCStr {
|
||||||
fn read_cstr(&mut self) -> io::Result<String>;
|
fn read_cstr(&mut self) -> io::Result<String>;
|
||||||
|
10
src/rows.rs
10
src/rows.rs
@ -3,16 +3,17 @@
|
|||||||
use std::ascii::AsciiExt;
|
use std::ascii::AsciiExt;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::io::Write;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::slice;
|
use std::slice;
|
||||||
|
use postgres_protocol::message::frontend;
|
||||||
|
|
||||||
use {Result, SessionInfoNew, RowsNew, LazyRowsNew, StatementInternals, WrongTypeNew};
|
use {Result, SessionInfoNew, RowsNew, LazyRowsNew, StatementInternals, WrongTypeNew};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
use types::{FromSql, SessionInfo, WrongType};
|
use types::{FromSql, SessionInfo, WrongType};
|
||||||
use stmt::{Statement, Column};
|
use stmt::{Statement, Column};
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use message::Frontend;
|
|
||||||
|
|
||||||
enum StatementContainer<'a> {
|
enum StatementContainer<'a> {
|
||||||
Borrowed(&'a Statement<'a>),
|
Borrowed(&'a Statement<'a>),
|
||||||
@ -350,11 +351,12 @@ impl<'trans, 'stmt> LazyRows<'trans, 'stmt> {
|
|||||||
fn execute(&mut self) -> Result<()> {
|
fn execute(&mut self) -> Result<()> {
|
||||||
let mut conn = self.stmt.conn().conn.borrow_mut();
|
let mut conn = self.stmt.conn().conn.borrow_mut();
|
||||||
|
|
||||||
try!(conn.write_messages(&[Frontend::Execute {
|
try!(conn.write_message(&frontend::Execute {
|
||||||
portal: &self.name,
|
portal: &self.name,
|
||||||
max_rows: self.row_limit,
|
max_rows: self.row_limit,
|
||||||
},
|
}));
|
||||||
Frontend::Sync]));
|
try!(conn.write_message(&frontend::Sync));
|
||||||
|
try!(conn.stream.flush());
|
||||||
conn.read_rows(&mut self.data).map(|more_rows| self.more_rows = more_rows)
|
conn.read_rows(&mut self.data).map(|more_rows| self.more_rows = more_rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
34
src/stmt.rs
34
src/stmt.rs
@ -5,10 +5,11 @@ use std::collections::VecDeque;
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use postgres_protocol::message::frontend;
|
||||||
|
|
||||||
use error::{Error, DbError};
|
use error::{Error, DbError};
|
||||||
use types::{SessionInfo, Type, ToSql};
|
use types::{SessionInfo, Type, ToSql};
|
||||||
use message::{WriteMessage, Backend, Frontend};
|
use message::Backend;
|
||||||
use rows::{Rows, LazyRows};
|
use rows::{Rows, LazyRows};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
use {bad_response, Connection, StatementInternals, Result, RowsNew, InnerConnection, SessionInfoNew,
|
use {bad_response, Connection, StatementInternals, Result, RowsNew, InnerConnection, SessionInfoNew,
|
||||||
@ -146,11 +147,11 @@ impl<'conn> Statement<'conn> {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Backend::CopyInResponse { .. } => {
|
Backend::CopyInResponse { .. } => {
|
||||||
try!(conn.write_messages(&[Frontend::CopyFail {
|
try!(conn.write_message(&frontend::CopyFail {
|
||||||
message: "COPY queries cannot be directly \
|
message: "COPY queries cannot be directly executed",
|
||||||
executed",
|
}));
|
||||||
},
|
try!(conn.write_message(&frontend::Sync));
|
||||||
Frontend::Sync]));
|
try!(conn.stream.flush());
|
||||||
}
|
}
|
||||||
Backend::CopyOutResponse { .. } => {
|
Backend::CopyOutResponse { .. } => {
|
||||||
loop {
|
loop {
|
||||||
@ -296,13 +297,13 @@ impl<'conn> Statement<'conn> {
|
|||||||
match fill_copy_buf(&mut buf, r, &info) {
|
match fill_copy_buf(&mut buf, r, &info) {
|
||||||
Ok(0) => break,
|
Ok(0) => break,
|
||||||
Ok(len) => {
|
Ok(len) => {
|
||||||
try_desync!(info.conn,
|
try!(info.conn.write_message(&frontend::CopyData { data: &buf[..len] }));
|
||||||
info.conn.stream.write_message(&Frontend::CopyData { data: &buf[..len] }));
|
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
try!(info.conn.write_messages(&[Frontend::CopyFail { message: "" },
|
try!(info.conn.write_message(&frontend::CopyFail { message: "" }));
|
||||||
Frontend::CopyDone,
|
try!(info.conn.write_message(&frontend::CopyDone));
|
||||||
Frontend::Sync]));
|
try!(info.conn.write_message(&frontend::Sync));
|
||||||
|
try!(info.conn.stream.flush());
|
||||||
match try!(info.conn.read_message()) {
|
match try!(info.conn.read_message()) {
|
||||||
Backend::ErrorResponse { .. } => {
|
Backend::ErrorResponse { .. } => {
|
||||||
// expected from the CopyFail
|
// expected from the CopyFail
|
||||||
@ -318,7 +319,9 @@ impl<'conn> Statement<'conn> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try!(info.conn.write_messages(&[Frontend::CopyDone, Frontend::Sync]));
|
try!(info.conn.write_message(&frontend::CopyDone));
|
||||||
|
try!(info.conn.write_message(&frontend::Sync));
|
||||||
|
try!(info.conn.stream.flush());
|
||||||
|
|
||||||
let num = match try!(info.conn.read_message()) {
|
let num = match try!(info.conn.read_message()) {
|
||||||
Backend::CommandComplete { tag } => parse_update_count(tag),
|
Backend::CommandComplete { tag } => parse_update_count(tag),
|
||||||
@ -365,9 +368,10 @@ impl<'conn> Statement<'conn> {
|
|||||||
let (format, column_formats) = match try!(conn.read_message()) {
|
let (format, column_formats) = match try!(conn.read_message()) {
|
||||||
Backend::CopyOutResponse { format, column_formats } => (format, column_formats),
|
Backend::CopyOutResponse { format, column_formats } => (format, column_formats),
|
||||||
Backend::CopyInResponse { .. } => {
|
Backend::CopyInResponse { .. } => {
|
||||||
try!(conn.write_messages(&[Frontend::CopyFail { message: "" },
|
try!(conn.write_message(&frontend::CopyFail { message: "" }));
|
||||||
Frontend::CopyDone,
|
try!(conn.write_message(&frontend::CopyDone));
|
||||||
Frontend::Sync]));
|
try!(conn.write_message(&frontend::Sync));
|
||||||
|
try!(conn.stream.flush());
|
||||||
match try!(conn.read_message()) {
|
match try!(conn.read_message()) {
|
||||||
Backend::ErrorResponse { .. } => {
|
Backend::ErrorResponse { .. } => {
|
||||||
// expected from the CopyFail
|
// expected from the CopyFail
|
||||||
|
Loading…
Reference in New Issue
Block a user