From 9aa2182cb21a751e97ac0abe38bbe64d5a2d9ac2 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Sat, 11 May 2024 12:45:04 +0800 Subject: [PATCH] refactor: make txn easy to use (#3905) refactor: put_if_not_exists and compare_and_put API --- src/common/meta/src/key/flow/flow_info.rs | 6 +- src/common/meta/src/key/flow/flow_name.rs | 5 +- src/common/meta/src/key/table_info.rs | 11 +--- src/common/meta/src/key/table_route.rs | 9 +-- src/common/meta/src/key/txn_helper.rs | 23 +------ src/common/meta/src/kv_backend/memory.rs | 4 +- src/common/meta/src/kv_backend/txn.rs | 76 +++++++++++++++-------- src/log-store/src/raft_engine/backend.rs | 2 +- 8 files changed, 62 insertions(+), 74 deletions(-) diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index ffa0cbad273c..0a2be4dea1a2 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -24,9 +24,7 @@ use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue, -}; +use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::table_name::TableName; @@ -181,7 +179,7 @@ impl FlowInfoManager { ) -> Result>>, )> { let key = FlowInfoKey::new(flow_id).to_bytes(); - let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?); + let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?); Ok(( txn, diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 311c187dd61e..271a24f641f6 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -191,10 +191,7 @@ impl FlowNameManager { let key = FlowNameKey::new(catalog_name, flow_name); let raw_key = key.to_bytes(); let flow_flow_name_value = FlowNameValue::new(flow_id); - let txn = txn_helper::build_put_if_absent_txn( - raw_key.clone(), - flow_flow_name_value.try_as_raw_value()?, - ); + let txn = Txn::put_if_not_exists(raw_key.clone(), flow_flow_name_value.try_as_raw_value()?); Ok(( txn, diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 3a9847e1809b..730daa1c3ede 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -24,9 +24,7 @@ use table::table_reference::TableReference; use super::TABLE_INFO_KEY_PATTERN; use crate::error::{InvalidTableMetadataSnafu, Result}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, -}; +use crate::key::{DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchGetRequest; @@ -153,10 +151,7 @@ impl TableInfoManager { let key = TableInfoKey::new(table_id); let raw_key = key.to_bytes(); - let txn = txn_helper::build_put_if_absent_txn( - raw_key.clone(), - table_info_value.try_as_raw_value()?, - ); + let txn = Txn::put_if_not_exists(raw_key.clone(), table_info_value.try_as_raw_value()?); Ok(( txn, @@ -182,7 +177,7 @@ impl TableInfoManager { let raw_value = current_table_info_value.get_raw_bytes(); let new_raw_value: Vec = new_table_info_value.try_as_raw_value()?; - let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value); + let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value); Ok(( txn, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 1b8c30ec11d1..47b79ea60b56 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -27,7 +27,7 @@ use crate::error::{ }; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue, + DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue, TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX, }; use crate::kv_backend::txn::Txn; @@ -492,10 +492,7 @@ impl TableRouteStorage { let key = TableRouteKey::new(table_id); let raw_key = key.to_bytes(); - let txn = txn_helper::build_put_if_absent_txn( - raw_key.clone(), - table_route_value.try_as_raw_value()?, - ); + let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?); Ok(( txn, @@ -522,7 +519,7 @@ impl TableRouteStorage { let raw_value = current_table_route_value.get_raw_bytes(); let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; - let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value); + let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value); Ok(( txn, diff --git a/src/common/meta/src/key/txn_helper.rs b/src/common/meta/src/key/txn_helper.rs index f4571ba5287b..54cd13f457b3 100644 --- a/src/common/meta/src/key/txn_helper.rs +++ b/src/common/meta/src/key/txn_helper.rs @@ -17,7 +17,7 @@ use serde::Serialize; use crate::error::Result; use crate::key::{DeserializedValueWithBytes, TableMetaValue}; -use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; +use crate::kv_backend::txn::TxnOpResponse; use crate::rpc::KeyValue; /// The response set of [TxnOpResponse::ResponseGet] @@ -67,24 +67,3 @@ impl From<&mut Vec> for TxnOpGetResponseSet { TxnOpGetResponseSet(value) } } - -pub(crate) fn build_put_if_absent_txn(key: Vec, value: Vec) -> Txn { - Txn::new() - .when(vec![Compare::with_not_exist_value( - key.clone(), - CompareOp::Equal, - )]) - .and_then(vec![TxnOp::Put(key.clone(), value)]) - .or_else(vec![TxnOp::Get(key)]) -} - -pub(crate) fn build_compare_and_put_txn(key: Vec, old_value: Vec, value: Vec) -> Txn { - Txn::new() - .when(vec![Compare::with_value( - key.clone(), - CompareOp::Equal, - old_value, - )]) - .and_then(vec![TxnOp::Put(key.clone(), value)]) - .or_else(vec![TxnOp::Get(key)]) -} diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 5c269a06a5ec..2310bd06538f 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -300,9 +300,7 @@ impl TxnService for MemoryKvBackend { let mut kvs = self.kvs.write().unwrap(); - let succeeded = compare - .iter() - .all(|x| x.compare_with_value(kvs.get(&x.key))); + let succeeded = compare.iter().all(|x| x.compare_value(kvs.get(&x.key))); let do_txn = |txn_op| match txn_op { TxnOp::Put(key, value) => { diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index c170b93fbe5a..c0d7e2e2f851 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -59,11 +59,11 @@ impl Compare { Self::new(key, cmp, Some(target)) } - pub fn with_not_exist_value(key: Vec, cmp: CompareOp) -> Self { + pub fn with_value_not_exists(key: Vec, cmp: CompareOp) -> Self { Self::new(key, cmp, None) } - pub fn compare_with_value(&self, value: Option<&Vec>) -> bool { + pub fn compare_value(&self, value: Option<&Vec>) -> bool { match (value, &self.target) { (Some(value), Some(target)) => match self.cmp { CompareOp::Equal => *value == *target, @@ -158,6 +158,30 @@ impl Txn { Txn::default() } + /// Builds a transaction that puts a value at a key if the key does not exist. + pub fn put_if_not_exists(key: Vec, value: Vec) -> Self { + Self::new() + .when(vec![Compare::with_value_not_exists( + key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put(key.clone(), value)]) + .or_else(vec![TxnOp::Get(key)]) + } + + /// Builds a transaction that puts a value at a key if the key exists and the value + /// is equal to `old_value`. + pub fn compare_and_put(key: Vec, old_value: Vec, value: Vec) -> Self { + Self::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + old_value, + )]) + .and_then(vec![TxnOp::Put(key.clone(), value)]) + .or_else(vec![TxnOp::Get(key)]) + } + /// Takes a list of comparison. If all comparisons passed in succeed, /// the operations passed into `and_then()` will be executed. Or the operations /// passed into `or_else()` will be executed. @@ -223,35 +247,35 @@ mod tests { fn test_compare() { // Equal let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]); - assert!(compare.compare_with_value(Some(&vec![1]))); - assert!(!compare.compare_with_value(None)); - let compare = Compare::with_not_exist_value(vec![1], CompareOp::Equal); - assert!(compare.compare_with_value(None)); + assert!(compare.compare_value(Some(&vec![1]))); + assert!(!compare.compare_value(None)); + let compare = Compare::with_value_not_exists(vec![1], CompareOp::Equal); + assert!(compare.compare_value(None)); // Greater let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]); - assert!(compare.compare_with_value(Some(&vec![2]))); - assert!(!compare.compare_with_value(None)); - let compare = Compare::with_not_exist_value(vec![1], CompareOp::Greater); - assert!(!compare.compare_with_value(None)); - assert!(compare.compare_with_value(Some(&vec![1]))); + assert!(compare.compare_value(Some(&vec![2]))); + assert!(!compare.compare_value(None)); + let compare = Compare::with_value_not_exists(vec![1], CompareOp::Greater); + assert!(!compare.compare_value(None)); + assert!(compare.compare_value(Some(&vec![1]))); // Less let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]); - assert!(compare.compare_with_value(Some(&vec![0]))); - assert!(compare.compare_with_value(None)); - let compare = Compare::with_not_exist_value(vec![1], CompareOp::Less); - assert!(!compare.compare_with_value(None)); - assert!(!compare.compare_with_value(Some(&vec![1]))); + assert!(compare.compare_value(Some(&vec![0]))); + assert!(compare.compare_value(None)); + let compare = Compare::with_value_not_exists(vec![1], CompareOp::Less); + assert!(!compare.compare_value(None)); + assert!(!compare.compare_value(Some(&vec![1]))); // NotEqual let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]); - assert!(!compare.compare_with_value(Some(&vec![1]))); - assert!(compare.compare_with_value(Some(&vec![2]))); - assert!(compare.compare_with_value(None)); - let compare = Compare::with_not_exist_value(vec![1], CompareOp::NotEqual); - assert!(!compare.compare_with_value(None)); - assert!(compare.compare_with_value(Some(&vec![1]))); + assert!(!compare.compare_value(Some(&vec![1]))); + assert!(compare.compare_value(Some(&vec![2]))); + assert!(compare.compare_value(None)); + let compare = Compare::with_value_not_exists(vec![1], CompareOp::NotEqual); + assert!(!compare.compare_value(None)); + assert!(compare.compare_value(Some(&vec![1]))); } #[test] @@ -348,7 +372,7 @@ mod tests { kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() - .when(vec![Compare::with_not_exist_value( + .when(vec![Compare::with_value_not_exists( key.clone(), CompareOp::Equal, )]) @@ -379,7 +403,7 @@ mod tests { kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() - .when(vec![Compare::with_not_exist_value( + .when(vec![Compare::with_value_not_exists( key.clone(), CompareOp::Greater, )]) @@ -421,7 +445,7 @@ mod tests { kv_backend.delete(&[3], false).await.unwrap(); let txn = Txn::new() - .when(vec![Compare::with_not_exist_value( + .when(vec![Compare::with_value_not_exists( key.clone(), CompareOp::Less, )]) @@ -463,7 +487,7 @@ mod tests { kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() - .when(vec![Compare::with_not_exist_value( + .when(vec![Compare::with_value_not_exists( key.clone(), CompareOp::NotEqual, )]) diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index b0abc76095f4..a811d0910907 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -93,7 +93,7 @@ impl TxnService for RaftEngineBackend { let engine = self.engine.write().unwrap(); for cmp in compare { let existing_value = engine_get(&engine, &cmp.key)?.map(|kv| kv.value); - if !cmp.compare_with_value(existing_value.as_ref()) { + if !cmp.compare_value(existing_value.as_ref()) { succeeded = false; break; }