Split State into two.

There is no reason for the buffer and typeinfo caches to share the same
lock. By splitting them it means we a) get slightly better performance,
but more importantly b) it makes it harder to accidentally deadlock.
This commit is contained in:
Erik Johnston 2021-05-14 15:20:36 +01:00
parent a84a45d88e
commit b7215c60d9

View File

@ -54,26 +54,32 @@ impl Responses {
}
}
struct State {
/// A cached prepared statement for basic information for a type from its
/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [prepare](prepare) module).
#[derive(Default)]
struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>,
/// A cached prepared statement for getting information for a composite type
/// from its OID. Corresponds to
/// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>,
/// A cached prepared statement for getting information for a composite type
/// from its OID. Corresponds to
/// [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or its fallback).
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// its fallback).
typeinfo_enum: Option<Statement>,
/// Cache of types already looked up.
types: HashMap<Oid, Type>,
buf: BytesMut,
}
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
state: Mutex<State>,
cached_typeinfo: Mutex<CachedTypeInfo>,
/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,
}
impl InnerClient {
@ -91,7 +97,7 @@ impl InnerClient {
}
pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
self.cached_typeinfo.lock().typeinfo.clone()
}
pub fn set_typeinfo(&self, statement: &Statement) {
@ -102,67 +108,61 @@ impl InnerClient {
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo.is_none() {
state.typeinfo = Some(statement.clone());
let mut cache = self.cached_typeinfo.lock();
if cache.typeinfo.is_none() {
cache.typeinfo = Some(statement.clone());
}
}
pub fn typeinfo_composite(&self) -> Option<Statement> {
self.state.lock().typeinfo_composite.clone()
self.cached_typeinfo.lock().typeinfo_composite.clone()
}
pub fn set_typeinfo_composite(&self, statement: &Statement) {
// We only insert the statement if there isn't already a cached
// statement (this is safe as they are prepared statements for the same
// query).
//
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo_composite.is_none() {
state.typeinfo_composite = Some(statement.clone());
let mut cache = self.cached_typeinfo.lock();
if cache.typeinfo_composite.is_none() {
cache.typeinfo_composite = Some(statement.clone());
}
}
pub fn typeinfo_enum(&self) -> Option<Statement> {
self.state.lock().typeinfo_enum.clone()
self.cached_typeinfo.lock().typeinfo_enum.clone()
}
pub fn set_typeinfo_enum(&self, statement: &Statement) {
// We only insert the statement if there isn't already a cached
// statement (this is safe as they are prepared statements for the same
// query).
//
// Note: We need to be sure that we don't drop a Statement while holding
// the state lock as its drop handling will call `with_buf`, which tries
// to take the lock.
let mut state = self.state.lock();
if state.typeinfo_enum.is_none() {
state.typeinfo_enum = Some(statement.clone());
let mut cache = self.cached_typeinfo.lock();
if cache.typeinfo_enum.is_none() {
cache.typeinfo_enum = Some(statement.clone());
}
}
pub fn type_(&self, oid: Oid) -> Option<Type> {
self.state.lock().types.get(&oid).cloned()
self.cached_typeinfo.lock().types.get(&oid).cloned()
}
pub fn set_type(&self, oid: Oid, type_: &Type) {
self.state.lock().types.insert(oid, type_.clone());
self.cached_typeinfo.lock().types.insert(oid, type_.clone());
}
pub fn clear_type_cache(&self) {
self.state.lock().types.clear();
self.cached_typeinfo.lock().types.clear();
}
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
let mut state = self.state.lock();
let r = f(&mut state.buf);
state.buf.clear();
let mut buffer = self.buffer.lock();
let r = f(&mut buffer);
buffer.clear();
r
}
}
@ -199,13 +199,8 @@ impl Client {
Client {
inner: Arc::new(InnerClient {
sender,
state: Mutex::new(State {
typeinfo: None,
typeinfo_composite: None,
typeinfo_enum: None,
types: HashMap::new(),
buf: BytesMut::new(),
}),
cached_typeinfo: Default::default(),
buffer: Default::default(),
}),
#[cfg(feature = "runtime")]
socket_config: None,