rust-postgres/tokio-postgres/src/client.rs

190 lines
5.2 KiB
Rust
Raw Normal View History

2019-07-24 02:54:22 +00:00
use crate::codec::BackendMessages;
use crate::connection::{Request, RequestMessages};
use crate::prepare::prepare;
2019-07-27 03:11:34 +00:00
use crate::query::{self, Query};
2019-07-25 02:18:15 +00:00
use crate::types::{Oid, ToSql, Type};
2019-07-24 02:54:22 +00:00
use crate::{Error, Statement};
use fallible_iterator::FallibleIterator;
use futures::channel::mpsc;
2019-07-25 02:18:15 +00:00
use futures::future;
use futures::{ready, StreamExt};
2019-07-24 03:16:31 +00:00
use parking_lot::Mutex;
2019-07-24 02:54:22 +00:00
use postgres_protocol::message::backend::Message;
2019-07-24 03:16:31 +00:00
use std::collections::HashMap;
2019-07-24 02:54:22 +00:00
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
2019-07-24 02:54:22 +00:00
pub struct Responses {
receiver: mpsc::Receiver<BackendMessages>,
cur: BackendMessages,
}
impl Responses {
2019-07-25 02:18:15 +00:00
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
2019-07-24 02:54:22 +00:00
loop {
match self.cur.next().map_err(Error::parse)? {
2019-07-25 02:18:15 +00:00
Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
Some(message) => return Poll::Ready(Ok(message)),
2019-07-24 02:54:22 +00:00
None => {}
}
2019-07-25 02:18:15 +00:00
match ready!(self.receiver.poll_next_unpin(cx)) {
2019-07-24 02:54:22 +00:00
Some(messages) => self.cur = messages,
2019-07-25 02:18:15 +00:00
None => return Poll::Ready(Err(Error::closed())),
2019-07-24 02:54:22 +00:00
}
}
}
2019-07-25 02:18:15 +00:00
pub async fn next(&mut self) -> Result<Message, Error> {
future::poll_fn(|cx| self.poll_next(cx)).await
}
2019-07-24 02:54:22 +00:00
}
2019-07-24 03:16:31 +00:00
struct State {
2019-07-27 03:11:34 +00:00
typeinfo: Option<Statement>,
typeinfo_composite: Option<Statement>,
typeinfo_enum: Option<Statement>,
2019-07-24 03:16:31 +00:00
types: HashMap<Oid, Type>,
}
2019-07-24 02:54:22 +00:00
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
2019-07-24 03:16:31 +00:00
state: Mutex<State>,
2019-07-24 02:54:22 +00:00
}
impl InnerClient {
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request { messages, sender };
self.sender
.unbounded_send(request)
.map_err(|_| Error::closed())?;
Ok(Responses {
receiver,
cur: BackendMessages::empty(),
})
}
2019-07-24 03:16:31 +00:00
2019-07-27 03:11:34 +00:00
pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
2019-07-24 03:16:31 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn set_typeinfo(&self, statement: &Statement) {
self.state.lock().typeinfo = Some(statement.clone());
2019-07-24 03:16:31 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn typeinfo_composite(&self) -> Option<Statement> {
self.state.lock().typeinfo_composite.clone()
2019-07-24 03:16:31 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn set_typeinfo_composite(&self, statement: &Statement) {
self.state.lock().typeinfo_composite = Some(statement.clone());
2019-07-24 03:16:31 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn typeinfo_enum(&self) -> Option<Statement> {
self.state.lock().typeinfo_enum.clone()
2019-07-24 03:16:31 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn set_typeinfo_enum(&self, statement: &Statement) {
self.state.lock().typeinfo_enum = Some(statement.clone());
2019-07-24 03:16:31 +00:00
}
pub fn type_(&self, oid: Oid) -> Option<Type> {
self.state.lock().types.get(&oid).cloned()
}
2019-07-27 03:11:34 +00:00
pub fn set_type(&self, oid: Oid, type_: &Type) {
self.state.lock().types.insert(oid, type_.clone());
2019-07-24 03:16:31 +00:00
}
2019-07-24 02:54:22 +00:00
}
pub struct Client {
inner: Arc<InnerClient>,
process_id: i32,
secret_key: i32,
}
impl Client {
pub(crate) fn new(
sender: mpsc::UnboundedSender<Request>,
process_id: i32,
secret_key: i32,
) -> Client {
Client {
2019-07-24 03:16:31 +00:00
inner: Arc::new(InnerClient {
sender,
state: Mutex::new(State {
2019-07-27 03:11:34 +00:00
typeinfo: None,
typeinfo_composite: None,
typeinfo_enum: None,
2019-07-24 03:16:31 +00:00
types: HashMap::new(),
}),
}),
process_id,
secret_key,
}
}
2019-07-24 02:54:22 +00:00
pub(crate) fn inner(&self) -> Arc<InnerClient> {
self.inner.clone()
}
2019-07-27 03:11:34 +00:00
pub fn prepare(&mut self, query: &str) -> impl Future<Output = Result<Statement, Error>> {
2019-07-24 02:54:22 +00:00
self.prepare_typed(query, &[])
}
2019-07-27 03:11:34 +00:00
pub fn prepare_typed(
2019-07-24 02:54:22 +00:00
&mut self,
2019-07-27 03:11:34 +00:00
query: &str,
parameter_types: &[Type],
) -> impl Future<Output = Result<Statement, Error>> {
2019-07-24 02:54:22 +00:00
prepare(self.inner(), query, parameter_types)
}
2019-07-25 02:18:15 +00:00
2019-07-27 03:11:34 +00:00
pub fn query(
2019-07-25 02:18:15 +00:00
&mut self,
2019-07-27 03:11:34 +00:00
statement: &Statement,
params: &[&dyn ToSql],
) -> impl Future<Output = Result<Query, Error>> {
let buf = query::encode(statement, params.iter().cloned());
query::query(self.inner(), statement.clone(), buf)
2019-07-25 02:18:15 +00:00
}
pub fn query_iter<'a, I>(
&mut self,
2019-07-27 03:11:34 +00:00
statement: &Statement,
2019-07-25 02:18:15 +00:00
params: I,
2019-07-27 03:11:34 +00:00
) -> impl Future<Output = Result<Query, Error>>
2019-07-25 02:18:15 +00:00
where
2019-07-27 03:11:34 +00:00
I: IntoIterator<Item = &'a dyn ToSql>,
2019-07-25 02:18:15 +00:00
I::IntoIter: ExactSizeIterator,
{
2019-07-27 03:11:34 +00:00
let buf = query::encode(statement, params);
query::query(self.inner(), statement.clone(), buf)
2019-07-25 02:18:15 +00:00
}
2019-07-27 03:11:34 +00:00
pub fn execute(
2019-07-25 02:18:15 +00:00
&mut self,
2019-07-27 03:11:34 +00:00
statement: &Statement,
params: &[&dyn ToSql],
) -> impl Future<Output = Result<u64, Error>> {
let buf = query::encode(statement, params.iter().cloned());
query::execute(self.inner(), buf)
2019-07-25 02:18:15 +00:00
}
pub fn execute_iter<'a, I>(
&mut self,
2019-07-27 03:11:34 +00:00
statement: &Statement,
2019-07-25 02:18:15 +00:00
params: I,
2019-07-27 03:11:34 +00:00
) -> impl Future<Output = Result<u64, Error>>
2019-07-25 02:18:15 +00:00
where
2019-07-27 03:11:34 +00:00
I: IntoIterator<Item = &'a dyn ToSql>,
2019-07-25 02:18:15 +00:00
I::IntoIter: ExactSizeIterator,
{
2019-07-27 03:11:34 +00:00
let buf = query::encode(statement, params);
query::execute(self.inner(), buf)
2019-07-25 02:18:15 +00:00
}
}