From adcbfb67dd75b8f99cc3f1e6dcc582b6bd53fee3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 31 Dec 2023 19:22:20 +0000 Subject: [PATCH 01/14] feat: implement `KeyRwLock` --- src/common/procedure/src/local.rs | 3 + src/common/procedure/src/local/rwlock.rs | 183 +++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 src/common/procedure/src/local/rwlock.rs diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index ae01022c9cc4..eff538d7b6a9 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -14,6 +14,9 @@ mod lock; mod runner; +//TODO(weny): Remove it. +#[allow(unused)] +mod rwlock; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs new file mode 100644 index 000000000000..b7e56730a405 --- /dev/null +++ b/src/common/procedure/src/local/rwlock.rs @@ -0,0 +1,183 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::hash::Hash; +use std::result::Result; +use std::sync::{Arc, Mutex}; + +use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, TryLockError}; + +pub struct OwnedKeyRwLockReadGuard { + key: K, + inner: Arc>>>>, + guard: Option>, +} + +pub struct OwnedKeyRwLockWriteGuard { + key: K, + inner: Arc>>>>, + guard: Option>, +} + +impl Drop for OwnedKeyRwLockWriteGuard { + fn drop(&mut self) { + // Always releases inner lock first. + { + self.guard.take().unwrap(); + } + let mut locks = self.inner.lock().unwrap(); + KeyRwLock::remove_key(&mut locks, &self.key); + } +} + +impl Drop for OwnedKeyRwLockReadGuard { + fn drop(&mut self) { + // Always releases inner lock first. + { + self.guard.take().unwrap(); + } + let mut locks = self.inner.lock().unwrap(); + KeyRwLock::remove_key(&mut locks, &self.key); + } +} + +/// Locks based on a key, allowing other keys to lock independently. +#[derive(Debug)] +pub struct KeyRwLock { + /// The inner map of locks for specific keys. + inner: Arc>>>>, +} + +impl KeyRwLock +where + K: Eq + Hash, +{ + /// Removes a key lock if it's exists and no one in use. + pub(crate) fn remove_key(locks: &mut HashMap>>, key: &K) { + if let Some(lock) = locks.get(key) { + if lock.try_write().is_ok() { + locks.remove(key); + } + } + } +} + +impl KeyRwLock +where + K: Eq + Hash + Send + Clone, +{ + pub fn new() -> Self { + KeyRwLock { + inner: Default::default(), + } + } + + /// Locks the key with shared read access, returning a guard. + pub async fn read(&self, key: K) -> OwnedKeyRwLockReadGuard { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key.clone()).or_default().clone() + }; + + OwnedKeyRwLockReadGuard { + key, + inner: self.inner.clone(), + guard: Some(lock.read_owned().await), + } + } + + /// Locks the key with exclusive write access, returning a guard. + pub async fn write(&self, key: K) -> OwnedKeyRwLockWriteGuard { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key.clone()).or_default().clone() + }; + + OwnedKeyRwLockWriteGuard { + key, + inner: self.inner.clone(), + guard: Some(lock.write_owned().await), + } + } + + /// Tries to lock the key with shared read access, returning immediately. + pub fn try_read(&self, key: K) -> Result, TryLockError> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key.clone()).or_default().clone() + }; + + let guard = lock.try_read_owned()?; + + Ok(OwnedKeyRwLockReadGuard { + key, + inner: self.inner.clone(), + guard: Some(guard), + }) + } + + /// Tries lock this key with exclusive write access, returning immediately. + pub fn try_write(&self, key: K) -> Result, TryLockError> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key.clone()).or_default().clone() + }; + + let guard = lock.try_write_owned()?; + + Ok(OwnedKeyRwLockWriteGuard { + key, + inner: self.inner.clone(), + guard: Some(guard), + }) + } + + /// Returns number of keys. + pub fn len(&self) -> usize { + self.inner.lock().unwrap().len() + } + + /// Returns true the inner map is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_naive() { + let lock_key = KeyRwLock::new(); + + { + let _guard = lock_key.read("test1").await; + assert_eq!(lock_key.len(), 1); + assert!(lock_key.try_read("test1").is_ok()); + assert!(lock_key.try_write("test1").is_err()); + } + + { + let _guard0 = lock_key.write("test2").await; + let _guard = lock_key.write("test1").await; + assert_eq!(lock_key.len(), 2); + assert!(lock_key.try_read("test1").is_err()); + assert!(lock_key.try_write("test1").is_err()); + } + + assert!(lock_key.is_empty()); + } +} From be707a5a600383215d3fa0fca97abe529c95c57b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 31 Dec 2023 19:32:05 +0000 Subject: [PATCH 02/14] refactor: use KeyRwLock instead of LockMap --- src/common/procedure/src/local.rs | 8 +++++--- src/common/procedure/src/local/runner.rs | 17 +++++++---------- src/common/procedure/src/local/rwlock.rs | 17 +++++++++++++++++ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index eff538d7b6a9..2bfa553b0b43 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//TODO(weny): Remove it. +#[allow(unused)] mod lock; mod runner; //TODO(weny): Remove it. @@ -32,11 +34,11 @@ use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; +use self::rwlock::KeyRwLock; use crate::error::{ DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; -use crate::local::lock::LockMap; use crate::local::runner::Runner; use crate::procedure::BoxedProcedureLoader; use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef}; @@ -134,7 +136,7 @@ struct LoadedProcedure { pub(crate) struct ManagerContext { /// Procedure loaders. The key is the type name of the procedure which the loader returns. loaders: Mutex>, - lock_map: LockMap, + key_lock: KeyRwLock, procedures: RwLock>, /// Messages loaded from the procedure store. messages: Mutex>, @@ -155,8 +157,8 @@ impl ManagerContext { /// Returns a new [ManagerContext]. fn new() -> ManagerContext { ManagerContext { + key_lock: KeyRwLock::new(), loaders: Mutex::new(HashMap::new()), - lock_map: LockMap::new(), procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), finished_procedures: Mutex::new(VecDeque::new()), diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 0b50f4497f03..d307b108c819 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -19,6 +19,7 @@ use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::logging; use tokio::time; +use super::rwlock::OwnedKeyRwLockGuard; use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; @@ -56,6 +57,7 @@ impl ExecResult { struct ProcedureGuard { meta: ProcedureMetaRef, manager_ctx: Arc, + key_guards: Vec>, finish: bool, } @@ -65,6 +67,7 @@ impl ProcedureGuard { ProcedureGuard { meta, manager_ctx, + key_guards: vec![], finish: false, } } @@ -94,11 +97,6 @@ impl Drop for ProcedureGuard { if let Some(parent_id) = self.meta.parent_id { self.manager_ctx.notify_by_subprocedure(parent_id); } - - // Release lock in reverse order. - for key in self.meta.lock_key.keys_to_unlock() { - self.manager_ctx.lock_map.release_lock(key, self.meta.id); - } } } @@ -121,7 +119,7 @@ impl Runner { /// Run the procedure. pub(crate) async fn run(mut self) { // Ensure we can update the procedure state. - let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); + let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); logging::info!( "Runner {}-{} starts", @@ -133,10 +131,9 @@ impl Runner { // recursive locking by adding a root procedure id to the meta. for key in self.meta.lock_key.keys_to_lock() { // Acquire lock for each key. - self.manager_ctx - .lock_map - .acquire_lock(key, self.meta.clone()) - .await; + guard + .key_guards + .push(self.manager_ctx.key_lock.write(key.clone()).await.into()); } // Execute the procedure. We need to release the lock whenever the the execution diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index b7e56730a405..647590c19c93 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -19,6 +19,23 @@ use std::sync::{Arc, Mutex}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, TryLockError}; +pub enum OwnedKeyRwLockGuard { + Read(OwnedKeyRwLockReadGuard), + Write(OwnedKeyRwLockWriteGuard), +} + +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedKeyRwLockReadGuard) -> Self { + OwnedKeyRwLockGuard::Read(guard) + } +} + +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedKeyRwLockWriteGuard) -> Self { + OwnedKeyRwLockGuard::Write(guard) + } +} + pub struct OwnedKeyRwLockReadGuard { key: K, inner: Arc>>>>, From fb229010936472b6dcb0539c2f516f23cbaef35f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 31 Dec 2023 19:55:16 +0000 Subject: [PATCH 03/14] refactor: use StringKey instead of String --- src/common/meta/src/ddl/alter_table.rs | 2 +- src/common/meta/src/ddl/create_table.rs | 2 +- src/common/meta/src/ddl/drop_table.rs | 2 +- src/common/meta/src/ddl/truncate_table.rs | 2 +- src/common/procedure/src/local.rs | 14 ++--- src/common/procedure/src/local/runner.rs | 34 +++++----- src/common/procedure/src/procedure.rs | 62 +++++++++---------- src/common/procedure/src/watcher.rs | 2 +- src/meta-srv/src/procedure/region_failover.rs | 2 +- .../src/procedure/region_migration.rs | 8 ++- 10 files changed, 69 insertions(+), 61 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index c3b1f7c31121..e196ed70c6d6 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -394,7 +394,7 @@ impl Procedure for AlterTableProcedure { fn lock_key(&self) -> LockKey { let key = self.lock_key_inner(); - LockKey::new(key) + LockKey::new_exclusive(key) } } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c6e09006b470..41fef6d5d6bf 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -349,7 +349,7 @@ impl Procedure for CreateTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 7fac47e62cb1..dfd674d13938 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -273,7 +273,7 @@ impl Procedure for DropTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index ec5a7897cd63..90f746104c99 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -81,7 +81,7 @@ impl Procedure for TruncateTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 2bfa553b0b43..7012286a800d 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -855,7 +855,7 @@ mod tests { assert!(manager.procedure_watcher(procedure_id).is_none()); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); assert!(manager .submit(ProcedureWithId { id: procedure_id, @@ -923,7 +923,7 @@ mod tests { } fn lock_key(&self) -> LockKey { - LockKey::single("test.submit") + LockKey::single_exclusive("test.submit") } } @@ -960,7 +960,7 @@ mod tests { let manager = LocalManager::new(config, state_store); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert_matches!( manager @@ -991,7 +991,7 @@ mod tests { manager.start().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { @@ -1023,7 +1023,7 @@ mod tests { manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { @@ -1046,7 +1046,7 @@ mod tests { // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. manager.stop().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); manager.manager_ctx.set_running(); @@ -1068,7 +1068,7 @@ mod tests { // After restart let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index d307b108c819..def942dec540 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -22,6 +22,7 @@ use tokio::time; use super::rwlock::OwnedKeyRwLockGuard; use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; +use crate::procedure::StringKey; use crate::store::ProcedureStore; use crate::ProcedureState::Retrying; use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; @@ -131,9 +132,14 @@ impl Runner { // recursive locking by adding a root procedure id to the meta. for key in self.meta.lock_key.keys_to_lock() { // Acquire lock for each key. - guard - .key_guards - .push(self.manager_ctx.key_lock.write(key.clone()).await.into()); + let key_guard = match key { + StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(), + StringKey::Exclusive(key) => { + self.manager_ctx.key_lock.write(key.clone()).await.into() + } + }; + + guard.key_guards.push(key_guard); } // Execute the procedure. We need to release the lock whenever the the execution @@ -601,7 +607,7 @@ mod tests { }; let normal = ProcedureAdapter { data: "normal".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -662,7 +668,7 @@ mod tests { }; let suspend = ProcedureAdapter { data: "suspend".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -694,7 +700,7 @@ mod tests { }; let child = ProcedureAdapter { data: "child".to_string(), - lock_key: LockKey::new(keys.iter().map(|k| k.to_string())), + lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())), exec_fn, }; @@ -762,7 +768,7 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -807,7 +813,7 @@ mod tests { let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed(); let normal = ProcedureAdapter { data: "normal".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -848,7 +854,7 @@ mod tests { |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); let normal = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -872,7 +878,7 @@ mod tests { |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -914,7 +920,7 @@ mod tests { let retry_later = ProcedureAdapter { data: "retry_later".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -949,7 +955,7 @@ mod tests { let exceed_max_retry_later = ProcedureAdapter { data: "exceed_max_retry_later".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -990,7 +996,7 @@ mod tests { }; let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table.region-0"), + lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"), exec_fn, }; @@ -1024,7 +1030,7 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 54f34b7d7ccf..1b6c5c13fc20 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -116,37 +116,55 @@ impl Procedure for Box { } } +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum StringKey { + Share(String), + Exclusive(String), +} + /// Keys to identify required locks. /// -/// [LockKey] always sorts keys lexicographically so that they can be acquired -/// in the same order. +/// [LockKey] respect the input order. // Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct LockKey(SmallVec<[String; 2]>); +pub struct LockKey(SmallVec<[StringKey; 2]>); + +impl StringKey { + pub fn into_string(self) -> String { + match self { + StringKey::Share(s) => s, + StringKey::Exclusive(s) => s, + } + } +} impl LockKey { /// Returns a new [LockKey] with only one key. - pub fn single(key: impl Into) -> LockKey { + pub fn single(key: impl Into) -> LockKey { LockKey(smallvec![key.into()]) } + /// Returns a new [LockKey] with only one key. + pub fn single_exclusive(key: impl Into) -> LockKey { + LockKey(smallvec![StringKey::Exclusive(key.into())]) + } + /// Returns a new [LockKey] with keys from specific `iter`. - pub fn new(iter: impl IntoIterator) -> LockKey { + pub fn new(iter: impl IntoIterator) -> LockKey { let mut vec: SmallVec<_> = iter.into_iter().collect(); - vec.sort(); // Dedup keys to avoid acquiring the same key multiple times. vec.dedup(); LockKey(vec) } - /// Returns the keys to lock. - pub fn keys_to_lock(&self) -> impl Iterator { - self.0.iter() + /// Returns a new [LockKey] with keys from specific `iter`. + pub fn new_exclusive(iter: impl IntoIterator) -> LockKey { + Self::new(iter.into_iter().map(StringKey::Exclusive)) } - /// Returns the keys to unlock. - pub fn keys_to_unlock(&self) -> impl Iterator { - self.0.iter().rev() + /// Returns the keys to lock. + pub fn keys_to_lock(&self) -> impl Iterator { + self.0.iter() } } @@ -337,26 +355,6 @@ mod tests { assert!(!status.need_persist()); } - #[test] - fn test_lock_key() { - let entity = "catalog.schema.my_table"; - let key = LockKey::single(entity); - assert_eq!(vec![entity], key.keys_to_lock().collect::>()); - assert_eq!(vec![entity], key.keys_to_unlock().collect::>()); - - let key = LockKey::new([ - "b".to_string(), - "c".to_string(), - "a".to_string(), - "c".to_string(), - ]); - assert_eq!(vec!["a", "b", "c"], key.keys_to_lock().collect::>()); - assert_eq!( - vec!["c", "b", "a"], - key.keys_to_unlock().collect::>() - ); - } - #[test] fn test_procedure_id() { let id = ProcedureId::random(); diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 75cf777beece..584aae520df7 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -98,7 +98,7 @@ mod tests { } fn lock_key(&self) -> LockKey { - LockKey::single("test.submit") + LockKey::single_exclusive("test.submit") } } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 50ab0e742307..37468437b2aa 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -373,7 +373,7 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; let region_key = region_lock_key(region_ident.table_id, region_ident.region_number); - LockKey::single(region_key) + LockKey::single_exclusive(region_key) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index b187a026723a..667c0b2baa2c 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -379,7 +379,7 @@ impl Procedure for RegionMigrationProcedure { fn lock_key(&self) -> LockKey { let key = self.context.persistent_ctx.lock_key(); - LockKey::single(key) + LockKey::single_exclusive(key) } } @@ -415,7 +415,11 @@ mod tests { let procedure = RegionMigrationProcedure::new(persistent_context, context); let key = procedure.lock_key(); - let keys = key.keys_to_lock().cloned().collect::>(); + let keys = key + .keys_to_lock() + .cloned() + .map(|s| s.into_string()) + .collect::>(); assert!(keys.contains(&expected_key)); } From 44eed75f4744f157062d76e35e53f89d4b6e13b1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 31 Dec 2023 20:00:39 +0000 Subject: [PATCH 04/14] chore: remove redundant code --- src/common/procedure/src/local.rs | 10 +- src/common/procedure/src/local/lock.rs | 214 ------------------------- 2 files changed, 2 insertions(+), 222 deletions(-) delete mode 100644 src/common/procedure/src/local/lock.rs diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 7012286a800d..4fc02ed011b7 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//TODO(weny): Remove it. -#[allow(unused)] -mod lock; mod runner; -//TODO(weny): Remove it. -#[allow(unused)] +// TODO(weny): Remove it. +#[allow(dead_code)] mod rwlock; use std::collections::{HashMap, VecDeque}; @@ -62,8 +59,6 @@ const META_TTL: Duration = Duration::from_secs(60 * 10); pub(crate) struct ProcedureMeta { /// Id of this procedure. id: ProcedureId, - /// Notify to wait for a lock. - lock_notify: Notify, /// Parent procedure id. parent_id: Option, /// Notify to wait for subprocedures. @@ -83,7 +78,6 @@ impl ProcedureMeta { let (state_sender, state_receiver) = watch::channel(ProcedureState::Running); ProcedureMeta { id, - lock_notify: Notify::new(), parent_id, child_notify: Notify::new(), lock_key, diff --git a/src/common/procedure/src/local/lock.rs b/src/common/procedure/src/local/lock.rs deleted file mode 100644 index 59e197d951bb..000000000000 --- a/src/common/procedure/src/local/lock.rs +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, VecDeque}; -use std::sync::RwLock; - -use crate::local::ProcedureMetaRef; -use crate::ProcedureId; - -/// A lock entry. -#[derive(Debug)] -struct Lock { - /// Current lock owner. - owner: ProcedureMetaRef, - /// Waiter procedures. - waiters: VecDeque, -} - -impl Lock { - /// Returns a [Lock] with specific `owner` procedure. - fn from_owner(owner: ProcedureMetaRef) -> Lock { - Lock { - owner, - waiters: VecDeque::new(), - } - } - - /// Try to pop a waiter from the waiter list, set it as owner - /// and wake up the new owner. - /// - /// Returns false if there is no waiter in the waiter list. - fn switch_owner(&mut self) -> bool { - if let Some(waiter) = self.waiters.pop_front() { - // Update owner. - self.owner = waiter.clone(); - // We need to use notify_one() since the waiter may have not called `notified()` yet. - waiter.lock_notify.notify_one(); - true - } else { - false - } - } -} - -/// Manages lock entries for procedures. -pub(crate) struct LockMap { - locks: RwLock>, -} - -impl LockMap { - /// Returns a new [LockMap]. - pub(crate) fn new() -> LockMap { - LockMap { - locks: RwLock::new(HashMap::new()), - } - } - - /// Acquire lock by `key` for procedure with specific `meta`. - /// - /// Though `meta` is cloneable, callers must ensure that only one `meta` - /// is acquiring and holding the lock at the same time. - /// - /// # Panics - /// Panics if the procedure acquires the lock recursively. - pub(crate) async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) { - assert!(!self.hold_lock(key, meta.id)); - - { - let mut locks = self.locks.write().unwrap(); - if let Some(lock) = locks.get_mut(key) { - // Lock already exists, but we don't expect that a procedure acquires - // the same lock again. - assert_ne!(lock.owner.id, meta.id); - - // Add this procedure to the waiter list. Here we don't check - // whether the procedure is already in the waiter list as we - // expect that a procedure should not wait for two lock simultaneously. - lock.waiters.push_back(meta.clone()); - } else { - let _ = locks.insert(key.to_string(), Lock::from_owner(meta)); - - return; - } - } - - // Wait for notify. - meta.lock_notify.notified().await; - - assert!(self.hold_lock(key, meta.id)); - } - - /// Release lock by `key`. - pub(crate) fn release_lock(&self, key: &str, procedure_id: ProcedureId) { - let mut locks = self.locks.write().unwrap(); - if let Some(lock) = locks.get_mut(key) { - if lock.owner.id != procedure_id { - // This is not the lock owner. - return; - } - - if !lock.switch_owner() { - // No body waits for this lock, we can remove the lock entry. - let _ = locks.remove(key); - } - } - } - - /// Returns true if the procedure with specific `procedure_id` holds the - /// lock of `key`. - fn hold_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { - let locks = self.locks.read().unwrap(); - locks - .get(key) - .map(|lock| lock.owner.id == procedure_id) - .unwrap_or(false) - } - - /// Returns true if the procedure is waiting for the lock `key`. - #[cfg(test)] - fn waiting_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { - let locks = self.locks.read().unwrap(); - locks - .get(key) - .map(|lock| lock.waiters.iter().any(|meta| meta.id == procedure_id)) - .unwrap_or(false) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::*; - use crate::local::test_util; - - #[test] - fn test_lock_no_waiter() { - let meta = Arc::new(test_util::procedure_meta_for_test()); - let mut lock = Lock::from_owner(meta); - - assert!(!lock.switch_owner()); - } - - #[tokio::test] - async fn test_lock_with_waiter() { - let owner = Arc::new(test_util::procedure_meta_for_test()); - let mut lock = Lock::from_owner(owner); - - let waiter = Arc::new(test_util::procedure_meta_for_test()); - lock.waiters.push_back(waiter.clone()); - - assert!(lock.switch_owner()); - assert!(lock.waiters.is_empty()); - - waiter.lock_notify.notified().await; - assert_eq!(lock.owner.id, waiter.id); - } - - #[tokio::test] - async fn test_lock_map() { - let key = "hello"; - - let owner = Arc::new(test_util::procedure_meta_for_test()); - let lock_map = Arc::new(LockMap::new()); - lock_map.acquire_lock(key, owner.clone()).await; - - let waiter = Arc::new(test_util::procedure_meta_for_test()); - let waiter_id = waiter.id; - - // Waiter release the lock, this should not take effect. - lock_map.release_lock(key, waiter_id); - - let lock_map2 = lock_map.clone(); - let owner_id = owner.id; - let handle = tokio::spawn(async move { - assert!(lock_map2.hold_lock(key, owner_id)); - assert!(!lock_map2.hold_lock(key, waiter_id)); - - // Waiter wait for lock. - lock_map2.acquire_lock(key, waiter.clone()).await; - - assert!(lock_map2.hold_lock(key, waiter_id)); - }); - - // Owner still holds the lock. - assert!(lock_map.hold_lock(key, owner_id)); - - // Wait until the waiter acquired the lock - while !lock_map.waiting_lock(key, waiter_id) { - tokio::time::sleep(std::time::Duration::from_millis(5)).await; - } - // Release lock - lock_map.release_lock(key, owner_id); - assert!(!lock_map.hold_lock(key, owner_id)); - - // Wait for task. - handle.await.unwrap(); - // The waiter should hold the lock now. - assert!(lock_map.hold_lock(key, waiter_id)); - - lock_map.release_lock(key, waiter_id); - } -} From 8410e7ce95973bd3a5e378783982abed165285c8 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jan 2024 04:54:20 +0000 Subject: [PATCH 05/14] refactor: cleanup KeyRwLock staled locks before granting new lock --- src/common/procedure/src/local/runner.rs | 2 +- src/common/procedure/src/local/rwlock.rs | 113 ++++++++--------------- 2 files changed, 37 insertions(+), 78 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index def942dec540..c4470d888b40 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -58,7 +58,7 @@ impl ExecResult { struct ProcedureGuard { meta: ProcedureMetaRef, manager_ctx: Arc, - key_guards: Vec>, + key_guards: Vec, finish: bool, } diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index 647590c19c93..b1c6d474572c 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -19,57 +19,23 @@ use std::sync::{Arc, Mutex}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, TryLockError}; -pub enum OwnedKeyRwLockGuard { - Read(OwnedKeyRwLockReadGuard), - Write(OwnedKeyRwLockWriteGuard), +pub enum OwnedKeyRwLockGuard { + Read(OwnedRwLockReadGuard<()>), + Write(OwnedRwLockWriteGuard<()>), } -impl From> for OwnedKeyRwLockGuard { - fn from(guard: OwnedKeyRwLockReadGuard) -> Self { +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedRwLockReadGuard<()>) -> Self { OwnedKeyRwLockGuard::Read(guard) } } -impl From> for OwnedKeyRwLockGuard { - fn from(guard: OwnedKeyRwLockWriteGuard) -> Self { +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedRwLockWriteGuard<()>) -> Self { OwnedKeyRwLockGuard::Write(guard) } } -pub struct OwnedKeyRwLockReadGuard { - key: K, - inner: Arc>>>>, - guard: Option>, -} - -pub struct OwnedKeyRwLockWriteGuard { - key: K, - inner: Arc>>>>, - guard: Option>, -} - -impl Drop for OwnedKeyRwLockWriteGuard { - fn drop(&mut self) { - // Always releases inner lock first. - { - self.guard.take().unwrap(); - } - let mut locks = self.inner.lock().unwrap(); - KeyRwLock::remove_key(&mut locks, &self.key); - } -} - -impl Drop for OwnedKeyRwLockReadGuard { - fn drop(&mut self) { - // Always releases inner lock first. - { - self.guard.take().unwrap(); - } - let mut locks = self.inner.lock().unwrap(); - KeyRwLock::remove_key(&mut locks, &self.key); - } -} - /// Locks based on a key, allowing other keys to lock independently. #[derive(Debug)] pub struct KeyRwLock { @@ -79,14 +45,23 @@ pub struct KeyRwLock { impl KeyRwLock where - K: Eq + Hash, + K: Eq + Hash + Clone, { - /// Removes a key lock if it's exists and no one in use. - pub(crate) fn remove_key(locks: &mut HashMap>>, key: &K) { - if let Some(lock) = locks.get(key) { - if lock.try_write().is_ok() { - locks.remove(key); - } + /// Remove locks that are not locked currently. + fn clean_up(locks: &mut HashMap>>) { + let keys = locks + .iter() + .filter_map(|(key, lock)| { + if lock.try_write().is_ok() { + Some(key.clone()) + } else { + None + } + }) + .collect::>(); + + for key in keys { + locks.remove(&key); } } } @@ -102,63 +77,47 @@ where } /// Locks the key with shared read access, returning a guard. - pub async fn read(&self, key: K) -> OwnedKeyRwLockReadGuard { + pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> { let lock = { let mut locks = self.inner.lock().unwrap(); + Self::clean_up(&mut locks); locks.entry(key.clone()).or_default().clone() }; - OwnedKeyRwLockReadGuard { - key, - inner: self.inner.clone(), - guard: Some(lock.read_owned().await), - } + lock.read_owned().await } /// Locks the key with exclusive write access, returning a guard. - pub async fn write(&self, key: K) -> OwnedKeyRwLockWriteGuard { + pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> { let lock = { let mut locks = self.inner.lock().unwrap(); + Self::clean_up(&mut locks); locks.entry(key.clone()).or_default().clone() }; - OwnedKeyRwLockWriteGuard { - key, - inner: self.inner.clone(), - guard: Some(lock.write_owned().await), - } + lock.write_owned().await } /// Tries to lock the key with shared read access, returning immediately. - pub fn try_read(&self, key: K) -> Result, TryLockError> { + pub fn try_read(&self, key: K) -> Result, TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); + Self::clean_up(&mut locks); locks.entry(key.clone()).or_default().clone() }; - let guard = lock.try_read_owned()?; - - Ok(OwnedKeyRwLockReadGuard { - key, - inner: self.inner.clone(), - guard: Some(guard), - }) + lock.try_read_owned() } /// Tries lock this key with exclusive write access, returning immediately. - pub fn try_write(&self, key: K) -> Result, TryLockError> { + pub fn try_write(&self, key: K) -> Result, TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); + Self::clean_up(&mut locks); locks.entry(key.clone()).or_default().clone() }; - let guard = lock.try_write_owned()?; - - Ok(OwnedKeyRwLockWriteGuard { - key, - inner: self.inner.clone(), - guard: Some(guard), - }) + lock.try_write_owned() } /// Returns number of keys. @@ -195,6 +154,6 @@ mod tests { assert!(lock_key.try_write("test1").is_err()); } - assert!(lock_key.is_empty()); + assert_eq!(lock_key.len(), 2); } } From 8f0203841b392fe344ab238184c7032abcec992b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jan 2024 05:24:32 +0000 Subject: [PATCH 06/14] feat: clean staled locks manually --- src/common/procedure/src/local/runner.rs | 8 +++- src/common/procedure/src/local/rwlock.rs | 56 +++++++++++------------- src/common/procedure/src/procedure.rs | 7 +++ 3 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index c4470d888b40..6302adfc649c 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -153,6 +153,10 @@ impl Runner { // Release locks and notify parent procedure. guard.finish(); + // Clean the staled locks. + self.manager_ctx + .key_lock + .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string())); // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { @@ -787,6 +791,7 @@ mod tests { runner.manager_ctx = manager_ctx.clone(); runner.run().await; + assert!(manager_ctx.key_lock.is_empty()); // Check child procedures. for child_id in children_ids { @@ -1045,10 +1050,11 @@ mod tests { // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. - runner.manager_ctx = manager_ctx; + runner.manager_ctx = manager_ctx.clone(); // Run the runner and execute the procedure. runner.run().await; + assert!(manager_ctx.key_lock.is_empty()); let err = meta.state().error().unwrap().output_msg(); assert!(err.contains("subprocedure failed"), "{err}"); } diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index b1c6d474572c..c066c0b1fa98 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -46,29 +46,6 @@ pub struct KeyRwLock { impl KeyRwLock where K: Eq + Hash + Clone, -{ - /// Remove locks that are not locked currently. - fn clean_up(locks: &mut HashMap>>) { - let keys = locks - .iter() - .filter_map(|(key, lock)| { - if lock.try_write().is_ok() { - Some(key.clone()) - } else { - None - } - }) - .collect::>(); - - for key in keys { - locks.remove(&key); - } - } -} - -impl KeyRwLock -where - K: Eq + Hash + Send + Clone, { pub fn new() -> Self { KeyRwLock { @@ -80,8 +57,7 @@ where pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> { let lock = { let mut locks = self.inner.lock().unwrap(); - Self::clean_up(&mut locks); - locks.entry(key.clone()).or_default().clone() + locks.entry(key).or_default().clone() }; lock.read_owned().await @@ -91,8 +67,7 @@ where pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> { let lock = { let mut locks = self.inner.lock().unwrap(); - Self::clean_up(&mut locks); - locks.entry(key.clone()).or_default().clone() + locks.entry(key).or_default().clone() }; lock.write_owned().await @@ -102,8 +77,7 @@ where pub fn try_read(&self, key: K) -> Result, TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); - Self::clean_up(&mut locks); - locks.entry(key.clone()).or_default().clone() + locks.entry(key).or_default().clone() }; lock.try_read_owned() @@ -113,8 +87,7 @@ where pub fn try_write(&self, key: K) -> Result, TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); - Self::clean_up(&mut locks); - locks.entry(key.clone()).or_default().clone() + locks.entry(key).or_default().clone() }; lock.try_write_owned() @@ -129,6 +102,24 @@ where pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// Clean up stale locks. + pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator) { + let mut locks = self.inner.lock().unwrap(); + + let mut keys = Vec::new(); + for key in iter { + if let Some(lock) = locks.get(key) { + if lock.try_write().is_ok() { + keys.push(key); + } + } + } + + for key in keys { + locks.remove(key); + } + } } #[cfg(test)] @@ -155,5 +146,8 @@ mod tests { } assert_eq!(lock_key.len(), 2); + + lock_key.clean_keys(&vec!["test1", "test2"]); + assert!(lock_key.is_empty()); } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 1b6c5c13fc20..65f5dc0c2d55 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -136,6 +136,13 @@ impl StringKey { StringKey::Exclusive(s) => s, } } + + pub fn as_string(&self) -> &String { + match self { + StringKey::Share(s) => s, + StringKey::Exclusive(s) => s, + } + } } impl LockKey { From 48eb30b1fab5e14d5d363104bd177c9cae7dc7fc Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jan 2024 09:55:03 +0000 Subject: [PATCH 07/14] feat: sort lock key in lexicographically order --- src/common/procedure/src/procedure.rs | 33 ++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 65f5dc0c2d55..2df005bdf042 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -116,7 +116,7 @@ impl Procedure for Box { } } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum StringKey { Share(String), Exclusive(String), @@ -124,8 +124,9 @@ pub enum StringKey { /// Keys to identify required locks. /// -/// [LockKey] respect the input order. -// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. +/// [LockKey] always sorts keys lexicographically so that they can be acquired +/// in the same order. +/// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct LockKey(SmallVec<[StringKey; 2]>); @@ -159,6 +160,7 @@ impl LockKey { /// Returns a new [LockKey] with keys from specific `iter`. pub fn new(iter: impl IntoIterator) -> LockKey { let mut vec: SmallVec<_> = iter.into_iter().collect(); + vec.sort(); // Dedup keys to avoid acquiring the same key multiple times. vec.dedup(); LockKey(vec) @@ -362,6 +364,31 @@ mod tests { assert!(!status.need_persist()); } + #[test] + fn test_lock_key() { + let entity = "catalog.schema.my_table"; + let key = LockKey::single_exclusive(entity); + assert_eq!( + vec![&StringKey::Exclusive(entity.to_string())], + key.keys_to_lock().collect::>() + ); + + let key = LockKey::new_exclusive([ + "b".to_string(), + "c".to_string(), + "a".to_string(), + "c".to_string(), + ]); + assert_eq!( + vec![ + &StringKey::Exclusive("a".to_string()), + &StringKey::Exclusive("b".to_string()), + &StringKey::Exclusive("c".to_string()) + ], + key.keys_to_lock().collect::>() + ); + } + #[test] fn test_procedure_id() { let id = ProcedureId::random(); From 05cf253b880aaf03df7a01354f2d2b0d49c4b5aa Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 2 Jan 2024 10:03:52 +0000 Subject: [PATCH 08/14] feat: ensure the ref count before dropping the rwlock --- src/common/procedure/src/local/rwlock.rs | 53 +++++++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index c066c0b1fa98..77363e83c1c9 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -104,14 +104,21 @@ where } /// Clean up stale locks. + /// + /// Note: It only cleans a lock if + /// - Its strong ref count equals one. + /// - Able to acquire the write lock. pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator) { let mut locks = self.inner.lock().unwrap(); - let mut keys = Vec::new(); for key in iter { if let Some(lock) = locks.get(key) { if lock.try_write().is_ok() { - keys.push(key); + debug_assert_eq!(Arc::weak_count(lock), 0); + // Ensures nobody keeps this ref. + if Arc::strong_count(lock) == 1 { + keys.push(key); + } } } } @@ -150,4 +157,46 @@ mod tests { lock_key.clean_keys(&vec!["test1", "test2"]); assert!(lock_key.is_empty()); } + + #[tokio::test] + async fn test_clean_keys() { + let lock_key = KeyRwLock::<&str>::new(); + { + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + } + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 1); + } + + // Someone has the ref of the rwlock, but it waits to be granted the lock. + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + // However, One thread trying to remove the "test" key should have no effect. + lock_key.clean_keys(vec![&"test"]); + // Should get the rwlock. + { + let inner = lock_key.inner.lock().unwrap(); + inner.get("test").unwrap(); + } + } } From 04ae749cbf43094d12b8dee230759b11c271f6d7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 2 Jan 2024 10:24:11 +0000 Subject: [PATCH 09/14] feat: add more tests for rwlock --- src/common/procedure/src/local/rwlock.rs | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index 77363e83c1c9..b4032ffb6efa 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -172,6 +172,33 @@ mod tests { .clone() }; assert_eq!(Arc::strong_count(&rwlock), 2); + let _guard = rwlock.read_owned().await; + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 2); + } + } + + { + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + let _guard = rwlock.write_owned().await; + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 2); + } } { From eb6fffc4735023ffb7328ffd1653afd0834160c1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 03:49:39 +0000 Subject: [PATCH 10/14] feat: drop the key guards first --- src/common/procedure/src/local/runner.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 6302adfc649c..50abb1a6b05c 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -98,6 +98,14 @@ impl Drop for ProcedureGuard { if let Some(parent_id) = self.meta.parent_id { self.manager_ctx.notify_by_subprocedure(parent_id); } + + // Drops the key guards. + std::mem::take(&mut self.key_guards); + + // Clean the staled locks. + self.manager_ctx + .key_lock + .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string())); } } @@ -153,10 +161,6 @@ impl Runner { // Release locks and notify parent procedure. guard.finish(); - // Clean the staled locks. - self.manager_ctx - .key_lock - .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string())); // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { From 2bd4626ee3147515e3d61071a734390734c1bc92 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 04:08:29 +0000 Subject: [PATCH 11/14] feat: drops the key guards in the reverse order --- src/common/procedure/src/local/runner.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 50abb1a6b05c..d98c6e2f67cf 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -99,8 +99,11 @@ impl Drop for ProcedureGuard { self.manager_ctx.notify_by_subprocedure(parent_id); } - // Drops the key guards. - std::mem::take(&mut self.key_guards); + // Drops the key guards in the reverse order. + { + let mut key_guards = std::mem::take(&mut self.key_guards); + key_guards.reverse(); + } // Clean the staled locks. self.manager_ctx From 4057a8b84c0fceb35e06b132b255a9caa608908b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 06:43:45 +0000 Subject: [PATCH 12/14] chore: apply suggestions from CR --- src/common/procedure/src/local/runner.rs | 5 +- src/common/procedure/src/local/rwlock.rs | 73 +++++++++++++++--------- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index d98c6e2f67cf..87f2e2f635b1 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -100,9 +100,8 @@ impl Drop for ProcedureGuard { } // Drops the key guards in the reverse order. - { - let mut key_guards = std::mem::take(&mut self.key_guards); - key_guards.reverse(); + while !self.key_guards.is_empty() { + self.key_guards.pop(); } // Clean the staled locks. diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index b4032ffb6efa..32c4c5217c52 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -14,10 +14,13 @@ use std::collections::HashMap; use std::hash::Hash; +#[cfg(test)] use std::result::Result; use std::sync::{Arc, Mutex}; -use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, TryLockError}; +#[cfg(test)] +use tokio::sync::TryLockError; +use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; pub enum OwnedKeyRwLockGuard { Read(OwnedRwLockReadGuard<()>), @@ -40,7 +43,7 @@ impl From> for OwnedKeyRwLockGuard { #[derive(Debug)] pub struct KeyRwLock { /// The inner map of locks for specific keys. - inner: Arc>>>>, + inner: Mutex>>>, } impl KeyRwLock @@ -73,6 +76,37 @@ where lock.write_owned().await } + /// Clean up stale locks. + /// + /// Note: It only cleans a lock if + /// - Its strong ref count equals one. + /// - Able to acquire the write lock. + pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator) { + let mut locks = self.inner.lock().unwrap(); + let mut keys = Vec::new(); + for key in iter { + if let Some(lock) = locks.get(key) { + if lock.try_write().is_ok() { + debug_assert_eq!(Arc::weak_count(lock), 0); + // Ensures nobody keeps this ref. + if Arc::strong_count(lock) == 1 { + keys.push(key); + } + } + } + } + + for key in keys { + locks.remove(key); + } + } +} + +#[cfg(test)] +impl KeyRwLock +where + K: Eq + Hash + Clone, +{ /// Tries to lock the key with shared read access, returning immediately. pub fn try_read(&self, key: K) -> Result, TryLockError> { let lock = { @@ -102,31 +136,6 @@ where pub fn is_empty(&self) -> bool { self.len() == 0 } - - /// Clean up stale locks. - /// - /// Note: It only cleans a lock if - /// - Its strong ref count equals one. - /// - Able to acquire the write lock. - pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator) { - let mut locks = self.inner.lock().unwrap(); - let mut keys = Vec::new(); - for key in iter { - if let Some(lock) = locks.get(key) { - if lock.try_write().is_ok() { - debug_assert_eq!(Arc::weak_count(lock), 0); - // Ensures nobody keeps this ref. - if Arc::strong_count(lock) == 1 { - keys.push(key); - } - } - } - } - - for key in keys { - locks.remove(key); - } - } } #[cfg(test)] @@ -156,6 +165,16 @@ mod tests { lock_key.clean_keys(&vec!["test1", "test2"]); assert!(lock_key.is_empty()); + + let mut guards = Vec::new(); + for key in ["test1", "test2"] { + guards.push(lock_key.read(key).await); + } + while !guards.is_empty() { + guards.pop(); + } + lock_key.clean_keys(vec![&"test1", &"test2"]); + assert_eq!(lock_key.len(), 0); } #[tokio::test] From 74533b8e4723a4a8ce2fca1ec70ec0714b53ab66 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 07:33:00 +0000 Subject: [PATCH 13/14] chore: apply suggestions from CR --- src/common/procedure/src/local.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 4fc02ed011b7..30c0403f683b 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -13,8 +13,6 @@ // limitations under the License. mod runner; -// TODO(weny): Remove it. -#[allow(dead_code)] mod rwlock; use std::collections::{HashMap, VecDeque}; From cfc3fb1a95f4b786309aa61bb4a1e9f8de4ee6a2 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 07:53:50 +0000 Subject: [PATCH 14/14] chore: apply suggestions from CR --- src/common/procedure/src/local/rwlock.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs index 32c4c5217c52..a1701320364c 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/local/rwlock.rs @@ -14,12 +14,8 @@ use std::collections::HashMap; use std::hash::Hash; -#[cfg(test)] -use std::result::Result; use std::sync::{Arc, Mutex}; -#[cfg(test)] -use tokio::sync::TryLockError; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; pub enum OwnedKeyRwLockGuard { @@ -108,7 +104,7 @@ where K: Eq + Hash + Clone, { /// Tries to lock the key with shared read access, returning immediately. - pub fn try_read(&self, key: K) -> Result, TryLockError> { + pub fn try_read(&self, key: K) -> Result, tokio::sync::TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); locks.entry(key).or_default().clone() @@ -118,7 +114,10 @@ where } /// Tries lock this key with exclusive write access, returning immediately. - pub fn try_write(&self, key: K) -> Result, TryLockError> { + pub fn try_write( + &self, + key: K, + ) -> Result, tokio::sync::TryLockError> { let lock = { let mut locks = self.inner.lock().unwrap(); locks.entry(key).or_default().clone()