Some cleanup

This commit is contained in:
Steven Fackler 2014-03-23 17:34:50 -07:00
parent 613ceec630
commit a773f19c20
2 changed files with 36 additions and 42 deletions

View File

@ -150,7 +150,7 @@ pub use stmt::{NormalPostgresStatement,
RowIndex, RowIndex,
TransactionalPostgresStatement}; TransactionalPostgresStatement};
macro_rules! if_ok_pg_conn( macro_rules! try_pg_conn(
($e:expr) => ( ($e:expr) => (
match $e { match $e {
Ok(ok) => ok, Ok(ok) => ok,
@ -159,7 +159,7 @@ macro_rules! if_ok_pg_conn(
) )
) )
macro_rules! if_ok_pg( macro_rules! try_pg(
($e:expr) => ( ($e:expr) => (
match $e { match $e {
Ok(ok) => ok, Ok(ok) => ok,
@ -168,7 +168,7 @@ macro_rules! if_ok_pg(
) )
) )
macro_rules! if_ok_desync( macro_rules! try_desync(
($e:expr) => ( ($e:expr) => (
match $e { match $e {
Ok(ok) => ok, Ok(ok) => ok,
@ -297,12 +297,12 @@ pub fn cancel_query(url: &str, ssl: &SslMode, data: PostgresCancelData)
Err(err) => return Err(err) Err(err) => return Err(err)
}; };
if_ok_pg_conn!(socket.write_message(&CancelRequest { try_pg_conn!(socket.write_message(&CancelRequest {
code: message::CANCEL_CODE, code: message::CANCEL_CODE,
process_id: data.process_id, process_id: data.process_id,
secret_key: data.secret_key secret_key: data.secret_key
})); }));
if_ok_pg_conn!(socket.flush()); try_pg_conn!(socket.flush());
Ok(()) Ok(())
} }
@ -337,10 +337,10 @@ fn initialize_stream(host: &str, port: Port, ssl: &SslMode)
&RequireSsl(ref ctx) => (true, ctx) &RequireSsl(ref ctx) => (true, ctx)
}; };
if_ok_pg_conn!(socket.write_message(&SslRequest { code: message::SSL_CODE })); try_pg_conn!(socket.write_message(&SslRequest { code: message::SSL_CODE }));
if_ok_pg_conn!(socket.flush()); try_pg_conn!(socket.flush());
if if_ok_pg_conn!(socket.read_u8()) == 'N' as u8 { if try_pg_conn!(socket.read_u8()) == 'N' as u8 {
if ssl_required { if ssl_required {
return Err(NoSslSupport); return Err(NoSslSupport);
} else { } else {
@ -432,10 +432,7 @@ impl InnerPostgresConnection {
None => DEFAULT_PORT None => DEFAULT_PORT
}; };
let stream = match initialize_stream(host, port, ssl) { let stream = try!(initialize_stream(host, port, ssl));
Ok(stream) => stream,
Err(err) => return Err(err)
};
let mut conn = InnerPostgresConnection { let mut conn = InnerPostgresConnection {
stream: BufferedStream::new(stream), stream: BufferedStream::new(stream),
@ -459,18 +456,15 @@ impl InnerPostgresConnection {
path.shift_char(); path.shift_char();
args.push((~"database", path)); args.push((~"database", path));
} }
if_ok_pg_conn!(conn.write_messages([StartupMessage { try_pg_conn!(conn.write_messages([StartupMessage {
version: message::PROTOCOL_VERSION, version: message::PROTOCOL_VERSION,
parameters: args.as_slice() parameters: args.as_slice()
}])); }]));
match conn.handle_auth(user) { try!(conn.handle_auth(user));
Err(err) => return Err(err),
Ok(()) => {}
}
loop { loop {
match if_ok_pg_conn!(conn.read_message()) { match try_pg_conn!(conn.read_message()) {
BackendKeyData { process_id, secret_key } => { BackendKeyData { process_id, secret_key } => {
conn.cancel_data.process_id = process_id; conn.cancel_data.process_id = process_id;
conn.cancel_data.secret_key = secret_key; conn.cancel_data.secret_key = secret_key;
@ -488,15 +482,15 @@ impl InnerPostgresConnection {
fn write_messages(&mut self, messages: &[FrontendMessage]) -> IoResult<()> { fn write_messages(&mut self, messages: &[FrontendMessage]) -> IoResult<()> {
assert!(!self.desynchronized); assert!(!self.desynchronized);
for message in messages.iter() { for message in messages.iter() {
if_ok_desync!(self.stream.write_message(message)); try_desync!(self.stream.write_message(message));
} }
Ok(if_ok_desync!(self.stream.flush())) Ok(try_desync!(self.stream.flush()))
} }
fn read_message(&mut self) -> IoResult<BackendMessage> { fn read_message(&mut self) -> IoResult<BackendMessage> {
assert!(!self.desynchronized); assert!(!self.desynchronized);
loop { loop {
match if_ok_desync!(self.stream.read_message()) { match try_desync!(self.stream.read_message()) {
NoticeResponse { fields } => NoticeResponse { fields } =>
self.notice_handler.handle(PostgresDbError::new(fields)), self.notice_handler.handle(PostgresDbError::new(fields)),
NotificationResponse { pid, channel, payload } => NotificationResponse { pid, channel, payload } =>
@ -514,14 +508,14 @@ impl InnerPostgresConnection {
fn handle_auth(&mut self, user: UserInfo) -> fn handle_auth(&mut self, user: UserInfo) ->
Result<(), PostgresConnectError> { Result<(), PostgresConnectError> {
match if_ok_pg_conn!(self.read_message()) { match try_pg_conn!(self.read_message()) {
AuthenticationOk => return Ok(()), AuthenticationOk => return Ok(()),
AuthenticationCleartextPassword => { AuthenticationCleartextPassword => {
let pass = match user.pass { let pass = match user.pass {
Some(pass) => pass, Some(pass) => pass,
None => return Err(MissingPassword) None => return Err(MissingPassword)
}; };
if_ok_pg_conn!(self.write_messages([PasswordMessage { password: pass }])); try_pg_conn!(self.write_messages([PasswordMessage { password: pass }]));
} }
AuthenticationMD5Password { salt } => { AuthenticationMD5Password { salt } => {
let UserInfo { user, pass } = user; let UserInfo { user, pass } = user;
@ -537,7 +531,7 @@ impl InnerPostgresConnection {
hasher.update(output.as_bytes()); hasher.update(output.as_bytes());
hasher.update(salt); hasher.update(salt);
let output = "md5" + hasher.final().to_hex(); let output = "md5" + hasher.final().to_hex();
if_ok_pg_conn!(self.write_messages([PasswordMessage { try_pg_conn!(self.write_messages([PasswordMessage {
password: output.as_slice() password: output.as_slice()
}])); }]));
} }
@ -550,7 +544,7 @@ impl InnerPostgresConnection {
_ => unreachable!() _ => unreachable!()
} }
match if_ok_pg_conn!(self.read_message()) { match try_pg_conn!(self.read_message()) {
AuthenticationOk => Ok(()), AuthenticationOk => Ok(()),
ErrorResponse { fields } => ErrorResponse { fields } =>
Err(PgConnectDbError(PostgresDbError::new(fields))), Err(PgConnectDbError(PostgresDbError::new(fields))),
@ -569,7 +563,7 @@ impl InnerPostgresConnection {
self.next_stmt_id += 1; self.next_stmt_id += 1;
let types = []; let types = [];
if_ok_pg!(self.write_messages([ try_pg!(self.write_messages([
Parse { Parse {
name: stmt_name, name: stmt_name,
query: query, query: query,
@ -581,7 +575,7 @@ impl InnerPostgresConnection {
}, },
Sync])); Sync]));
match if_ok_pg!(self.read_message()) { match try_pg!(self.read_message()) {
ParseComplete => {} ParseComplete => {}
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(self.wait_for_ready()); try!(self.wait_for_ready());
@ -590,13 +584,13 @@ impl InnerPostgresConnection {
_ => unreachable!() _ => unreachable!()
} }
let mut param_types: Vec<PostgresType> = match if_ok_pg!(self.read_message()) { let mut param_types: Vec<PostgresType> = match try_pg!(self.read_message()) {
ParameterDescription { types } => ParameterDescription { types } =>
types.iter().map(|ty| PostgresType::from_oid(*ty)).collect(), types.iter().map(|ty| PostgresType::from_oid(*ty)).collect(),
_ => unreachable!() _ => unreachable!()
}; };
let mut result_desc: Vec<ResultDescription> = match if_ok_pg!(self.read_message()) { let mut result_desc: Vec<ResultDescription> = match try_pg!(self.read_message()) {
RowDescription { descriptions } => RowDescription { descriptions } =>
descriptions.move_iter().map(|desc| { descriptions.move_iter().map(|desc| {
stmt::make_ResultDescription(desc) stmt::make_ResultDescription(desc)
@ -653,7 +647,7 @@ impl InnerPostgresConnection {
} }
fn wait_for_ready(&mut self) -> Result<(), PostgresError> { fn wait_for_ready(&mut self) -> Result<(), PostgresError> {
match if_ok_pg!(self.read_message()) { match try_pg!(self.read_message()) {
ReadyForQuery { .. } => Ok(()), ReadyForQuery { .. } => Ok(()),
_ => unreachable!() _ => unreachable!()
} }
@ -662,11 +656,11 @@ impl InnerPostgresConnection {
fn quick_query(&mut self, query: &str) fn quick_query(&mut self, query: &str)
-> Result<Vec<Vec<Option<~str>>>, PostgresError> { -> Result<Vec<Vec<Option<~str>>>, PostgresError> {
check_desync!(self); check_desync!(self);
if_ok_pg!(self.write_messages([Query { query: query }])); try_pg!(self.write_messages([Query { query: query }]));
let mut result = Vec::new(); let mut result = Vec::new();
loop { loop {
match if_ok_pg!(self.read_message()) { match try_pg!(self.read_message()) {
ReadyForQuery { .. } => break, ReadyForQuery { .. } => break,
DataRow { row } => DataRow { row } =>
result.push(row.move_iter().map(|opt| result.push(row.move_iter().map(|opt|
@ -684,7 +678,7 @@ impl InnerPostgresConnection {
fn finish_inner(&mut self) -> Result<(), PostgresError> { fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self); check_desync!(self);
Ok(if_ok_pg!(self.write_messages([Terminate]))) Ok(try_pg!(self.write_messages([Terminate])))
} }
} }

View File

@ -153,14 +153,14 @@ impl<'conn> Drop for NormalPostgresStatement<'conn> {
impl<'conn> NormalPostgresStatement<'conn> { impl<'conn> NormalPostgresStatement<'conn> {
fn finish_inner(&mut self) -> Result<(), PostgresError> { fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self.conn); check_desync!(self.conn);
if_ok_pg!(self.conn.write_messages([ try_pg!(self.conn.write_messages([
Close { Close {
variant: 'S' as u8, variant: 'S' as u8,
name: self.name.as_slice() name: self.name.as_slice()
}, },
Sync])); Sync]));
loop { loop {
match if_ok_pg!(self.conn.read_message()) { match try_pg!(self.conn.read_message()) {
ReadyForQuery { .. } => break, ReadyForQuery { .. } => break,
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(self.conn.wait_for_ready()); try!(self.conn.wait_for_ready());
@ -189,7 +189,7 @@ impl<'conn> NormalPostgresStatement<'conn> {
desc.ty.result_format() as i16 desc.ty.result_format() as i16
}).collect(); }).collect();
if_ok_pg!(self.conn.write_messages([ try_pg!(self.conn.write_messages([
Bind { Bind {
portal: portal_name, portal: portal_name,
statement: self.name.as_slice(), statement: self.name.as_slice(),
@ -203,7 +203,7 @@ impl<'conn> NormalPostgresStatement<'conn> {
}, },
Sync])); Sync]));
match if_ok_pg!(self.conn.read_message()) { match try_pg!(self.conn.read_message()) {
BindComplete => Ok(()), BindComplete => Ok(()),
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(self.conn.wait_for_ready()); try!(self.conn.wait_for_ready());
@ -251,7 +251,7 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
let num; let num;
loop { loop {
match if_ok_pg!(self.conn.read_message()) { match try_pg!(self.conn.read_message()) {
DataRow { .. } => {} DataRow { .. } => {}
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(self.conn.wait_for_ready()); try!(self.conn.wait_for_ready());
@ -402,14 +402,14 @@ impl<'stmt> Drop for PostgresResult<'stmt> {
impl<'stmt> PostgresResult<'stmt> { impl<'stmt> PostgresResult<'stmt> {
fn finish_inner(&mut self) -> Result<(), PostgresError> { fn finish_inner(&mut self) -> Result<(), PostgresError> {
check_desync!(self.stmt.conn); check_desync!(self.stmt.conn);
if_ok_pg!(self.stmt.conn.write_messages([ try_pg!(self.stmt.conn.write_messages([
Close { Close {
variant: 'P' as u8, variant: 'P' as u8,
name: self.name.as_slice() name: self.name.as_slice()
}, },
Sync])); Sync]));
loop { loop {
match if_ok_pg!(self.stmt.conn.read_message()) { match try_pg!(self.stmt.conn.read_message()) {
ReadyForQuery { .. } => break, ReadyForQuery { .. } => break,
ErrorResponse { fields } => { ErrorResponse { fields } => {
try!(self.stmt.conn.wait_for_ready()); try!(self.stmt.conn.wait_for_ready());
@ -423,7 +423,7 @@ impl<'stmt> PostgresResult<'stmt> {
fn read_rows(&mut self) -> Result<(), PostgresError> { fn read_rows(&mut self) -> Result<(), PostgresError> {
loop { loop {
match if_ok_pg!(self.stmt.conn.read_message()) { match try_pg!(self.stmt.conn.read_message()) {
EmptyQueryResponse | EmptyQueryResponse |
CommandComplete { .. } => { CommandComplete { .. } => {
self.more_rows = false; self.more_rows = false;
@ -441,7 +441,7 @@ impl<'stmt> PostgresResult<'stmt> {
} }
fn execute(&mut self) -> Result<(), PostgresError> { fn execute(&mut self) -> Result<(), PostgresError> {
if_ok_pg!(self.stmt.conn.write_messages([ try_pg!(self.stmt.conn.write_messages([
Execute { Execute {
portal: self.name, portal: self.name,
max_rows: self.row_limit as i32 max_rows: self.row_limit as i32