rust-postgres/src/lib.rs

322 lines
8.7 KiB
Rust
Raw Normal View History

2013-08-22 05:52:15 +00:00
extern mod extra;
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
use std::cell::Cell;
use std::hashmap::HashMap;
use std::rt::io::net::ip::SocketAddr;
use std::rt::io::net::tcp::TcpStream;
use extra::url::Url;
2013-08-23 07:13:42 +00:00
use std::str;
2013-08-04 02:17:32 +00:00
2013-08-22 05:52:15 +00:00
use message::*;
2013-07-25 07:10:18 +00:00
2013-08-22 05:52:15 +00:00
mod message;
2013-07-25 07:10:18 +00:00
2013-08-18 03:30:31 +00:00
pub struct PostgresConnection {
2013-08-22 05:52:15 +00:00
priv stream: Cell<TcpStream>,
priv next_stmt_id: Cell<int>
2013-08-04 02:17:32 +00:00
}
2013-08-18 03:30:31 +00:00
impl Drop for PostgresConnection {
2013-08-04 02:17:32 +00:00
fn drop(&self) {
2013-08-22 07:12:35 +00:00
self.write_message(&Terminate);
2013-08-18 03:42:40 +00:00
}
2013-08-04 02:17:32 +00:00
}
2013-07-25 07:10:18 +00:00
2013-08-18 03:30:31 +00:00
impl PostgresConnection {
2013-08-23 02:47:06 +00:00
pub fn connect(url: &str) -> PostgresConnection {
2013-08-22 05:52:15 +00:00
let parsed_url: Url = FromStr::from_str(url).unwrap();
let socket_url = fmt!("%s:%s", parsed_url.host,
2013-08-23 02:47:06 +00:00
parsed_url.port.get_ref().as_slice());
2013-08-22 05:52:15 +00:00
let addr: SocketAddr = FromStr::from_str(socket_url).unwrap();
let conn = PostgresConnection {
stream: Cell::new(TcpStream::connect(addr).unwrap()),
2013-08-18 03:42:40 +00:00
next_stmt_id: Cell::new(0)
};
2013-08-22 06:41:26 +00:00
let mut args = HashMap::new();
2013-08-23 02:47:06 +00:00
args.insert(&"user", parsed_url.user.get_ref().user.as_slice());
2013-08-22 06:41:26 +00:00
conn.write_message(&StartupMessage(args));
2013-08-04 05:21:16 +00:00
2013-08-22 06:41:26 +00:00
match conn.read_message() {
2013-08-22 05:52:15 +00:00
AuthenticationOk => (),
2013-08-23 05:24:14 +00:00
resp => fail!("Bad response: %?", resp.to_str())
2013-08-04 02:17:32 +00:00
}
2013-08-22 05:52:15 +00:00
loop {
2013-08-23 05:24:14 +00:00
match conn.read_message() {
2013-08-22 05:52:15 +00:00
ParameterStatus(param, value) =>
printfln!("Param %s = %s", param, value),
2013-08-22 06:41:26 +00:00
BackendKeyData(*) => (),
ReadyForQuery(*) => break,
2013-08-23 05:24:14 +00:00
resp => fail!("Bad response: %?", resp.to_str())
}
2013-08-05 00:48:48 +00:00
}
2013-08-23 05:24:14 +00:00
conn
2013-08-05 00:48:48 +00:00
}
2013-08-22 06:41:26 +00:00
fn write_message(&self, message: &FrontendMessage) {
2013-08-22 05:52:15 +00:00
do self.stream.with_mut_ref |s| {
2013-08-22 06:41:26 +00:00
s.write_message(message);
2013-07-25 07:10:18 +00:00
}
2013-08-22 06:41:26 +00:00
}
2013-08-04 02:17:32 +00:00
2013-08-22 06:41:26 +00:00
fn read_message(&self) -> BackendMessage {
2013-08-22 05:52:15 +00:00
do self.stream.with_mut_ref |s| {
2013-08-22 06:41:26 +00:00
s.read_message()
2013-08-04 05:21:16 +00:00
}
2013-08-22 06:41:26 +00:00
}
pub fn prepare<'a>(&'a self, query: &str) -> PostgresStatement<'a> {
let id = self.next_stmt_id.take();
let stmt_name = ifmt!("statement_{}", id);
self.next_stmt_id.put_back(id + 1);
2013-08-05 00:48:48 +00:00
2013-08-22 06:41:26 +00:00
let types = [];
self.write_message(&Parse(stmt_name, query, types));
self.write_message(&Sync);
match self.read_message() {
2013-08-22 05:52:15 +00:00
ParseComplete => (),
2013-08-23 05:24:14 +00:00
resp @ ErrorResponse(*) => fail!("Error: %?", resp.to_str()),
resp => fail!("Bad response: %?", resp.to_str())
2013-08-05 00:48:48 +00:00
}
2013-08-22 06:41:26 +00:00
self.wait_for_ready();
self.write_message(&Describe('S' as u8, stmt_name));
self.write_message(&Sync);
let num_params = match self.read_message() {
ParameterDescription(ref types) => types.len(),
2013-08-23 05:24:14 +00:00
resp => fail!("Bad response: %?", resp.to_str())
2013-08-22 06:41:26 +00:00
};
match self.read_message() {
RowDescription(*) | NoData => (),
2013-08-23 05:24:14 +00:00
resp => fail!("Bad response: %?", resp.to_str())
2013-08-05 00:48:48 +00:00
}
2013-08-17 22:09:26 +00:00
2013-08-22 06:41:26 +00:00
self.wait_for_ready();
PostgresStatement {
conn: self,
name: stmt_name,
2013-08-22 07:12:35 +00:00
num_params: num_params,
next_portal_id: Cell::new(0)
2013-08-22 06:41:26 +00:00
}
}
2013-08-23 05:24:14 +00:00
pub fn in_transaction<T, E: ToStr>(&self, blk: &fn(&PostgresConnection)
-> Result<T, E>)
-> Result<T, E> {
2013-08-23 07:13:42 +00:00
self.quick_query("BEGIN");
2013-08-23 05:24:14 +00:00
// If this fails, Postgres will rollback when the connection closes
let ret = blk(self);
if ret.is_ok() {
2013-08-23 07:13:42 +00:00
self.quick_query("COMMIT");
2013-08-23 05:24:14 +00:00
} else {
2013-08-23 07:13:42 +00:00
self.quick_query("ROLLBACK");
2013-08-23 05:24:14 +00:00
}
ret
}
2013-08-23 07:13:42 +00:00
fn quick_query(&self, query: &str) {
self.write_message(&Query(query));
loop {
match self.read_message() {
ReadyForQuery(*) => break,
resp @ ErrorResponse(*) => fail!("Error: %?", resp.to_str()),
_ => ()
}
}
}
2013-08-22 06:41:26 +00:00
fn wait_for_ready(&self) {
2013-08-23 05:24:14 +00:00
loop {
match self.read_message() {
ReadyForQuery(*) => break,
resp => fail!("Bad response: %?", resp.to_str())
}
2013-08-22 06:41:26 +00:00
}
2013-08-17 22:09:26 +00:00
}
}
2013-08-22 05:52:15 +00:00
pub struct PostgresStatement<'self> {
priv conn: &'self PostgresConnection,
2013-08-22 06:41:26 +00:00
priv name: ~str,
2013-08-22 07:12:35 +00:00
priv num_params: uint,
priv next_portal_id: Cell<uint>
}
#[unsafe_destructor]
impl<'self> Drop for PostgresStatement<'self> {
fn drop(&self) {
self.conn.write_message(&Close('S' as u8, self.name.as_slice()));
self.conn.write_message(&Sync);
2013-08-23 05:24:14 +00:00
loop {
match self.conn.read_message() {
ReadyForQuery(*) => break,
_ => ()
}
}
2013-08-22 07:12:35 +00:00
}
}
2013-08-22 05:52:15 +00:00
impl<'self> PostgresStatement<'self> {
2013-08-22 07:12:35 +00:00
pub fn num_params(&self) -> uint {
self.num_params
}
2013-08-23 05:24:14 +00:00
fn execute(&self, portal_name: &str) {
2013-08-22 07:12:35 +00:00
let formats = [];
let values = [];
let result_formats = [];
self.conn.write_message(&Bind(portal_name, self.name.as_slice(),
formats, values, result_formats));
2013-08-23 05:24:14 +00:00
self.conn.write_message(&Execute(portal_name.as_slice(), 0));
2013-08-22 07:12:35 +00:00
self.conn.write_message(&Sync);
match self.conn.read_message() {
BindComplete => (),
2013-08-23 05:24:14 +00:00
resp @ ErrorResponse(*) => fail!("Error: %?", resp.to_str()),
resp => fail!("Bad response: %?", resp.to_str())
2013-08-22 07:12:35 +00:00
}
2013-08-23 05:24:14 +00:00
}
pub fn update(&self) -> uint {
self.execute("");
2013-08-22 07:12:35 +00:00
2013-08-23 05:24:14 +00:00
let mut num = 0;
loop {
match self.conn.read_message() {
CommandComplete(ret) => {
let s = ret.split_iter(' ').last().unwrap();
match FromStr::from_str(s) {
None => (),
Some(n) => num = n
}
break;
}
DataRow(*) => (),
EmptyQueryResponse => break,
NoticeResponse(*) => (),
resp @ ErrorResponse(*) => fail!("Error: %?", resp.to_str()),
resp => fail!("Bad response: %?", resp.to_str())
}
}
2013-08-22 07:12:35 +00:00
self.conn.wait_for_ready();
2013-08-23 05:24:14 +00:00
num
2013-08-22 07:12:35 +00:00
}
2013-08-23 07:13:42 +00:00
pub fn query<'a>(&'a self) -> PostgresResult<'a> {
let id = self.next_portal_id.take();
let portal_name = ifmt!("{:s}_portal_{}", self.name.as_slice(), id);
self.next_portal_id.put_back(id + 1);
self.execute(portal_name);
let mut data = ~[];
loop {
match self.conn.read_message() {
EmptyQueryResponse => break,
DataRow(row) => data.push(row),
CommandComplete(*) => break,
NoticeResponse(*) => (),
resp @ ErrorResponse(*) => fail!("Error: %?", resp.to_str()),
resp => fail!("Bad response: %?", resp.to_str())
}
}
PostgresResult {
stmt: self,
name: portal_name,
data: data
}
}
}
pub struct PostgresResult<'self> {
priv stmt: &'self PostgresStatement<'self>,
priv name: ~str,
priv data: ~[~[Option<~[u8]>]]
}
#[unsafe_destructor]
impl<'self> Drop for PostgresResult<'self> {
fn drop(&self) {
self.stmt.conn.write_message(&Close('P' as u8, self.name.as_slice()));
self.stmt.conn.write_message(&Sync);
loop {
match self.stmt.conn.read_message() {
ReadyForQuery(*) => break,
_ => ()
}
}
}
}
impl<'self> PostgresResult<'self> {
pub fn iter<'a>(&'a self) -> PostgresResultIterator<'a> {
PostgresResultIterator { result: self, next_row: 0 }
}
}
pub struct PostgresResultIterator<'self> {
priv result: &'self PostgresResult<'self>,
priv next_row: uint
}
impl<'self> Iterator<PostgresRow<'self>> for PostgresResultIterator<'self> {
fn next(&mut self) -> Option<PostgresRow<'self>> {
if self.next_row == self.result.data.len() {
return None;
}
let row = self.next_row;
self.next_row += 1;
Some(PostgresRow { result: self.result, row: row })
}
}
pub struct PostgresRow<'self> {
priv result: &'self PostgresResult<'self>,
priv row: uint
}
impl<'self> Container for PostgresRow<'self> {
fn len(&self) -> uint {
self.result.data[self.row].len()
}
}
impl<'self, T: FromSql> Index<uint, T> for PostgresRow<'self> {
fn index(&self, idx: &uint) -> T {
self.get(*idx)
}
}
impl<'self> PostgresRow<'self> {
pub fn get<T: FromSql>(&self, idx: uint) -> T {
FromSql::from_sql(&self.result.data[self.row][idx])
}
}
pub trait FromSql {
fn from_sql(raw: &Option<~[u8]>) -> Self;
}
impl FromSql for int {
fn from_sql(raw: &Option<~[u8]>) -> int {
FromStr::from_str(str::from_bytes_slice(raw.get_ref().as_slice()))
.unwrap()
}
}