Merge pull request #773 from erikjohnston/avoid_deadlock

Fix deadlock when pipelining statements.
This commit is contained in:
Steven Fackler 2021-05-18 20:45:58 -04:00 committed by GitHub
commit 1986cb1775
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -54,17 +54,32 @@ impl Responses {
}
}
struct State {
/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [prepare](prepare) module).
#[derive(Default)]
struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// its fallback).
typeinfo_enum: Option<Statement>,
/// Cache of types already looked up.
types: HashMap<Oid, Type>,
buf: BytesMut,
}
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
state: Mutex<State>,
cached_typeinfo: Mutex<CachedTypeInfo>,
/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,
}
impl InnerClient {
@ -82,48 +97,50 @@ impl InnerClient {
}
pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
self.cached_typeinfo.lock().typeinfo.clone()
}
pub fn set_typeinfo(&self, statement: &Statement) {
self.state.lock().typeinfo = Some(statement.clone());
self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
}
pub fn typeinfo_composite(&self) -> Option<Statement> {
self.state.lock().typeinfo_composite.clone()
self.cached_typeinfo.lock().typeinfo_composite.clone()
}
pub fn set_typeinfo_composite(&self, statement: &Statement) {
self.state.lock().typeinfo_composite = Some(statement.clone());
self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
}
pub fn typeinfo_enum(&self) -> Option<Statement> {
self.state.lock().typeinfo_enum.clone()
self.cached_typeinfo.lock().typeinfo_enum.clone()
}
pub fn set_typeinfo_enum(&self, statement: &Statement) {
self.state.lock().typeinfo_enum = Some(statement.clone());
self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
}
pub fn type_(&self, oid: Oid) -> Option<Type> {
self.state.lock().types.get(&oid).cloned()
self.cached_typeinfo.lock().types.get(&oid).cloned()
}
pub fn set_type(&self, oid: Oid, type_: &Type) {
self.state.lock().types.insert(oid, type_.clone());
self.cached_typeinfo.lock().types.insert(oid, type_.clone());
}
pub fn clear_type_cache(&self) {
self.state.lock().types.clear();
self.cached_typeinfo.lock().types.clear();
}
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
let mut state = self.state.lock();
let r = f(&mut state.buf);
state.buf.clear();
let mut buffer = self.buffer.lock();
let r = f(&mut buffer);
buffer.clear();
r
}
}
@ -160,13 +177,8 @@ impl Client {
Client {
inner: Arc::new(InnerClient {
sender,
state: Mutex::new(State {
typeinfo: None,
typeinfo_composite: None,
typeinfo_enum: None,
types: HashMap::new(),
buf: BytesMut::new(),
}),
cached_typeinfo: Default::default(),
buffer: Default::default(),
}),
#[cfg(feature = "runtime")]
socket_config: None,