2019-07-24 02:54:22 +00:00
|
|
|
use crate::codec::BackendMessages;
|
|
|
|
use crate::connection::{Request, RequestMessages};
|
|
|
|
use crate::prepare::prepare;
|
2019-07-24 03:16:31 +00:00
|
|
|
use crate::types::{Oid, Type};
|
2019-07-24 02:54:22 +00:00
|
|
|
use crate::{Error, Statement};
|
|
|
|
use fallible_iterator::FallibleIterator;
|
2019-07-22 04:42:42 +00:00
|
|
|
use futures::channel::mpsc;
|
2019-07-24 02:54:22 +00:00
|
|
|
use futures::{Stream, StreamExt};
|
2019-07-24 03:16:31 +00:00
|
|
|
use parking_lot::Mutex;
|
2019-07-24 02:54:22 +00:00
|
|
|
use postgres_protocol::message::backend::Message;
|
2019-07-24 03:16:31 +00:00
|
|
|
use std::collections::HashMap;
|
2019-07-24 02:54:22 +00:00
|
|
|
use std::future::Future;
|
|
|
|
use std::pin::Pin;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::task::{Context, Poll};
|
2019-07-22 04:42:42 +00:00
|
|
|
|
2019-07-24 02:54:22 +00:00
|
|
|
pub struct Responses {
|
|
|
|
receiver: mpsc::Receiver<BackendMessages>,
|
|
|
|
cur: BackendMessages,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Responses {
|
|
|
|
pub async fn next(&mut self) -> Result<Message, Error> {
|
|
|
|
loop {
|
|
|
|
match self.cur.next().map_err(Error::parse)? {
|
|
|
|
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
|
|
|
|
Some(message) => return Ok(message),
|
|
|
|
None => {}
|
|
|
|
}
|
|
|
|
|
|
|
|
match self.receiver.next().await {
|
|
|
|
Some(messages) => self.cur = messages,
|
|
|
|
None => return Err(Error::closed()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-24 03:16:31 +00:00
|
|
|
struct State {
|
|
|
|
has_typeinfo: bool,
|
|
|
|
has_typeinfo_composite: bool,
|
|
|
|
has_typeinfo_enum: bool,
|
|
|
|
types: HashMap<Oid, Type>,
|
|
|
|
}
|
|
|
|
|
2019-07-24 02:54:22 +00:00
|
|
|
pub struct InnerClient {
|
2019-07-22 04:42:42 +00:00
|
|
|
sender: mpsc::UnboundedSender<Request>,
|
2019-07-24 03:16:31 +00:00
|
|
|
state: Mutex<State>,
|
2019-07-24 02:54:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl InnerClient {
|
|
|
|
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
|
|
|
|
let (sender, receiver) = mpsc::channel(1);
|
|
|
|
let request = Request { messages, sender };
|
|
|
|
self.sender
|
|
|
|
.unbounded_send(request)
|
|
|
|
.map_err(|_| Error::closed())?;
|
|
|
|
|
|
|
|
Ok(Responses {
|
|
|
|
receiver,
|
|
|
|
cur: BackendMessages::empty(),
|
|
|
|
})
|
|
|
|
}
|
2019-07-24 03:16:31 +00:00
|
|
|
|
|
|
|
pub fn has_typeinfo(&self) -> bool {
|
|
|
|
self.state.lock().has_typeinfo
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_has_typeinfo(&self) {
|
|
|
|
self.state.lock().has_typeinfo = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn has_typeinfo_composite(&self) -> bool {
|
|
|
|
self.state.lock().has_typeinfo_composite
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_has_typeinfo_composite(&self) {
|
|
|
|
self.state.lock().has_typeinfo_composite = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn has_typeinfo_enum(&self) -> bool {
|
|
|
|
self.state.lock().has_typeinfo_enum
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_has_typeinfo_enum(&self) {
|
|
|
|
self.state.lock().has_typeinfo_enum = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn type_(&self, oid: Oid) -> Option<Type> {
|
|
|
|
self.state.lock().types.get(&oid).cloned()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_type(&self, oid: Oid, type_: Type) {
|
|
|
|
self.state.lock().types.insert(oid, type_);
|
|
|
|
}
|
2019-07-24 02:54:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Client {
|
|
|
|
inner: Arc<InnerClient>,
|
2019-07-22 04:42:42 +00:00
|
|
|
process_id: i32,
|
|
|
|
secret_key: i32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Client {
|
|
|
|
pub(crate) fn new(
|
|
|
|
sender: mpsc::UnboundedSender<Request>,
|
|
|
|
process_id: i32,
|
|
|
|
secret_key: i32,
|
|
|
|
) -> Client {
|
|
|
|
Client {
|
2019-07-24 03:16:31 +00:00
|
|
|
inner: Arc::new(InnerClient {
|
|
|
|
sender,
|
|
|
|
state: Mutex::new(State {
|
|
|
|
has_typeinfo: false,
|
|
|
|
has_typeinfo_composite: false,
|
|
|
|
has_typeinfo_enum: false,
|
|
|
|
types: HashMap::new(),
|
|
|
|
}),
|
|
|
|
}),
|
2019-07-22 04:42:42 +00:00
|
|
|
process_id,
|
|
|
|
secret_key,
|
|
|
|
}
|
|
|
|
}
|
2019-07-24 02:54:22 +00:00
|
|
|
|
|
|
|
pub(crate) fn inner(&self) -> Arc<InnerClient> {
|
|
|
|
self.inner.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn prepare<'a>(
|
|
|
|
&mut self,
|
|
|
|
query: &'a str,
|
|
|
|
) -> impl Future<Output = Result<Statement, Error>> + 'a {
|
|
|
|
self.prepare_typed(query, &[])
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn prepare_typed<'a>(
|
|
|
|
&mut self,
|
|
|
|
query: &'a str,
|
|
|
|
parameter_types: &'a [Type],
|
|
|
|
) -> impl Future<Output = Result<Statement, Error>> + 'a {
|
|
|
|
prepare(self.inner(), query, parameter_types)
|
|
|
|
}
|
2019-07-22 04:42:42 +00:00
|
|
|
}
|