From a84a45d88ed157c8756d5e54b2b13467721e5d12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 May 2021 14:59:07 +0100 Subject: [PATCH] Fix deadlock when pipelining statements. When executing statements in parallel there is a race where we prepare the type info queries multiple times, and so insert into the type info caches multiple times. This resulted in any existing cached `Statement` to be dropped, running its destructor which attempts to take out the state lock that is already being held, resulting in a deadlock. Fixes #772. --- tokio-postgres/src/client.rs | 45 +++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index f19005e5..b417c139 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -55,8 +55,17 @@ impl Responses { } struct State { + /// A cached prepared statement for basic information for a type from its + /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its + /// fallback). typeinfo: Option, + /// A cached prepared statement for getting information for a composite type + /// from its OID. Corresponds to + /// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY). typeinfo_composite: Option, + /// A cached prepared statement for getting information for a composite type + /// from its OID. Corresponds to + /// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or its fallback). typeinfo_enum: Option, types: HashMap, buf: BytesMut, @@ -86,7 +95,17 @@ impl InnerClient { } pub fn set_typeinfo(&self, statement: &Statement) { - self.state.lock().typeinfo = Some(statement.clone()); + // We only insert the statement if there isn't already a cached + // statement (this is safe as they are prepared statements for the same + // query). + // + // Note: We need to be sure that we don't drop a Statement while holding + // the state lock as its drop handling will call `with_buf`, which tries + // to take the lock. + let mut state = self.state.lock(); + if state.typeinfo.is_none() { + state.typeinfo = Some(statement.clone()); + } } pub fn typeinfo_composite(&self) -> Option { @@ -94,7 +113,17 @@ impl InnerClient { } pub fn set_typeinfo_composite(&self, statement: &Statement) { - self.state.lock().typeinfo_composite = Some(statement.clone()); + // We only insert the statement if there isn't already a cached + // statement (this is safe as they are prepared statements for the same + // query). + // + // Note: We need to be sure that we don't drop a Statement while holding + // the state lock as its drop handling will call `with_buf`, which tries + // to take the lock. + let mut state = self.state.lock(); + if state.typeinfo_composite.is_none() { + state.typeinfo_composite = Some(statement.clone()); + } } pub fn typeinfo_enum(&self) -> Option { @@ -102,7 +131,17 @@ impl InnerClient { } pub fn set_typeinfo_enum(&self, statement: &Statement) { - self.state.lock().typeinfo_enum = Some(statement.clone()); + // We only insert the statement if there isn't already a cached + // statement (this is safe as they are prepared statements for the same + // query). + // + // Note: We need to be sure that we don't drop a Statement while holding + // the state lock as its drop handling will call `with_buf`, which tries + // to take the lock. + let mut state = self.state.lock(); + if state.typeinfo_enum.is_none() { + state.typeinfo_enum = Some(statement.clone()); + } } pub fn type_(&self, oid: Oid) -> Option {