Fix deadlock when pipelining statements.

When executing statements in parallel there is a race where we prepare
the type info queries multiple times, and so insert into the type info
caches multiple times. This resulted in any existing cached `Statement`
to be dropped, running its destructor which attempts to take out the
state lock that is already being held, resulting in a deadlock.

Fixes #772.
This commit is contained in:
Erik Johnston 2021-05-14 14:59:07 +01:00
parent 3073435009
commit a84a45d88e

View File

@ -55,8 +55,17 @@ impl Responses {
} }
struct State { struct State {
/// A cached prepared statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>, typeinfo: Option<Statement>,
/// A cached prepared statement for getting information for a composite type
/// from its OID. Corresponds to
/// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>, typeinfo_composite: Option<Statement>,
/// A cached prepared statement for getting information for a composite type
/// from its OID. Corresponds to
/// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or its fallback).
typeinfo_enum: Option<Statement>, typeinfo_enum: Option<Statement>,
types: HashMap<Oid, Type>, types: HashMap<Oid, Type>,
buf: BytesMut, buf: BytesMut,
@ -86,7 +95,17 @@ impl InnerClient {
} }
pub fn set_typeinfo(&self, statement: &Statement) { pub fn set_typeinfo(&self, statement: &Statement) {
self.state.lock().typeinfo = Some(statement.clone()); // We only insert the statement if there isn't already a cached
// statement (this is safe as they are prepared statements for the same
// query).
//
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo.is_none() {
state.typeinfo = Some(statement.clone());
}
} }
pub fn typeinfo_composite(&self) -> Option<Statement> { pub fn typeinfo_composite(&self) -> Option<Statement> {
@ -94,7 +113,17 @@ impl InnerClient {
} }
pub fn set_typeinfo_composite(&self, statement: &Statement) { pub fn set_typeinfo_composite(&self, statement: &Statement) {
self.state.lock().typeinfo_composite = Some(statement.clone()); // We only insert the statement if there isn't already a cached
// statement (this is safe as they are prepared statements for the same
// query).
//
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo_composite.is_none() {
state.typeinfo_composite = Some(statement.clone());
}
} }
pub fn typeinfo_enum(&self) -> Option<Statement> { pub fn typeinfo_enum(&self) -> Option<Statement> {
@ -102,7 +131,17 @@ impl InnerClient {
} }
pub fn set_typeinfo_enum(&self, statement: &Statement) { pub fn set_typeinfo_enum(&self, statement: &Statement) {
self.state.lock().typeinfo_enum = Some(statement.clone()); // We only insert the statement if there isn't already a cached
// statement (this is safe as they are prepared statements for the same
// query).
//
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo_enum.is_none() {
state.typeinfo_enum = Some(statement.clone());
}
} }
pub fn type_(&self, oid: Oid) -> Option<Type> { pub fn type_(&self, oid: Oid) -> Option<Type> {