Skip to content

Commit

Permalink
refactor: make txn easy to use (GreptimeTeam#3905)
Browse files Browse the repository at this point in the history
refactor: put_if_not_exists and compare_and_put API
  • Loading branch information
fengjiachun authored May 11, 2024
1 parent bca2e39 commit 9aa2182
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 74 deletions.
6 changes: 2 additions & 4 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue,
};
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
Expand Down Expand Up @@ -181,7 +179,7 @@ impl FlowInfoManager {
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>>,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?);
let txn = Txn::put_if_not_exists(key.clone(), flow_value.try_as_raw_value()?);

Ok((
txn,
Expand Down
5 changes: 1 addition & 4 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ impl FlowNameManager {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
flow_flow_name_value.try_as_raw_value()?,
);
let txn = Txn::put_if_not_exists(raw_key.clone(), flow_flow_name_value.try_as_raw_value()?);

Ok((
txn,
Expand Down
11 changes: 3 additions & 8 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use table::table_reference::TableReference;
use super::TABLE_INFO_KEY_PATTERN;
use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX,
};
use crate::key::{DeserializedValueWithBytes, MetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
Expand Down Expand Up @@ -153,10 +151,7 @@ impl TableInfoManager {
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();

let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_info_value.try_as_raw_value()?,
);
let txn = Txn::put_if_not_exists(raw_key.clone(), table_info_value.try_as_raw_value()?);

Ok((
txn,
Expand All @@ -182,7 +177,7 @@ impl TableInfoManager {
let raw_value = current_table_info_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_info_value.try_as_raw_value()?;

let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);

Ok((
txn,
Expand Down
9 changes: 3 additions & 6 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{
};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue,
DeserializedValueWithBytes, MetaKey, RegionDistribution, TableMetaValue,
TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
};
use crate::kv_backend::txn::Txn;
Expand Down Expand Up @@ -492,10 +492,7 @@ impl TableRouteStorage {
let key = TableRouteKey::new(table_id);
let raw_key = key.to_bytes();

let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
);
let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);

Ok((
txn,
Expand All @@ -522,7 +519,7 @@ impl TableRouteStorage {
let raw_value = current_table_route_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;

let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);

Ok((
txn,
Expand Down
23 changes: 1 addition & 22 deletions src/common/meta/src/key/txn_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use serde::Serialize;

use crate::error::Result;
use crate::key::{DeserializedValueWithBytes, TableMetaValue};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::txn::TxnOpResponse;
use crate::rpc::KeyValue;

/// The response set of [TxnOpResponse::ResponseGet]
Expand Down Expand Up @@ -67,24 +67,3 @@ impl From<&mut Vec<TxnOpResponse>> for TxnOpGetResponseSet {
TxnOpGetResponseSet(value)
}
}

pub(crate) fn build_put_if_absent_txn(key: Vec<u8>, value: Vec<u8>) -> Txn {
Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}

pub(crate) fn build_compare_and_put_txn(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Txn {
Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
old_value,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}
4 changes: 1 addition & 3 deletions src/common/meta/src/kv_backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {

let mut kvs = self.kvs.write().unwrap();

let succeeded = compare
.iter()
.all(|x| x.compare_with_value(kvs.get(&x.key)));
let succeeded = compare.iter().all(|x| x.compare_value(kvs.get(&x.key)));

let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
Expand Down
76 changes: 50 additions & 26 deletions src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ impl Compare {
Self::new(key, cmp, Some(target))
}

pub fn with_not_exist_value(key: Vec<u8>, cmp: CompareOp) -> Self {
pub fn with_value_not_exists(key: Vec<u8>, cmp: CompareOp) -> Self {
Self::new(key, cmp, None)
}

pub fn compare_with_value(&self, value: Option<&Vec<u8>>) -> bool {
pub fn compare_value(&self, value: Option<&Vec<u8>>) -> bool {
match (value, &self.target) {
(Some(value), Some(target)) => match self.cmp {
CompareOp::Equal => *value == *target,
Expand Down Expand Up @@ -158,6 +158,30 @@ impl Txn {
Txn::default()
}

/// Builds a transaction that puts a value at a key if the key does not exist.
pub fn put_if_not_exists(key: Vec<u8>, value: Vec<u8>) -> Self {
Self::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}

/// Builds a transaction that puts a value at a key if the key exists and the value
/// is equal to `old_value`.
pub fn compare_and_put(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Self {
Self::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
old_value,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}

/// Takes a list of comparison. If all comparisons passed in succeed,
/// the operations passed into `and_then()` will be executed. Or the operations
/// passed into `or_else()` will be executed.
Expand Down Expand Up @@ -223,35 +247,35 @@ mod tests {
fn test_compare() {
// Equal
let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]);
assert!(compare.compare_with_value(Some(&vec![1])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Equal);
assert!(compare.compare_with_value(None));
assert!(compare.compare_value(Some(&vec![1])));
assert!(!compare.compare_value(None));
let compare = Compare::with_value_not_exists(vec![1], CompareOp::Equal);
assert!(compare.compare_value(None));

// Greater
let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]);
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Greater);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
assert!(compare.compare_value(Some(&vec![2])));
assert!(!compare.compare_value(None));
let compare = Compare::with_value_not_exists(vec![1], CompareOp::Greater);
assert!(!compare.compare_value(None));
assert!(compare.compare_value(Some(&vec![1])));

// Less
let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]);
assert!(compare.compare_with_value(Some(&vec![0])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Less);
assert!(!compare.compare_with_value(None));
assert!(!compare.compare_with_value(Some(&vec![1])));
assert!(compare.compare_value(Some(&vec![0])));
assert!(compare.compare_value(None));
let compare = Compare::with_value_not_exists(vec![1], CompareOp::Less);
assert!(!compare.compare_value(None));
assert!(!compare.compare_value(Some(&vec![1])));

// NotEqual
let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]);
assert!(!compare.compare_with_value(Some(&vec![1])));
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::NotEqual);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
assert!(!compare.compare_value(Some(&vec![1])));
assert!(compare.compare_value(Some(&vec![2])));
assert!(compare.compare_value(None));
let compare = Compare::with_value_not_exists(vec![1], CompareOp::NotEqual);
assert!(!compare.compare_value(None));
assert!(compare.compare_value(Some(&vec![1])));
}

#[test]
Expand Down Expand Up @@ -348,7 +372,7 @@ mod tests {
kv_backend.delete(&key, false).await.unwrap();

let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Equal,
)])
Expand Down Expand Up @@ -379,7 +403,7 @@ mod tests {
kv_backend.delete(&key, false).await.unwrap();

let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Greater,
)])
Expand Down Expand Up @@ -421,7 +445,7 @@ mod tests {
kv_backend.delete(&[3], false).await.unwrap();

let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Less,
)])
Expand Down Expand Up @@ -463,7 +487,7 @@ mod tests {
kv_backend.delete(&key, false).await.unwrap();

let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::NotEqual,
)])
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/raft_engine/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl TxnService for RaftEngineBackend {
let engine = self.engine.write().unwrap();
for cmp in compare {
let existing_value = engine_get(&engine, &cmp.key)?.map(|kv| kv.value);
if !cmp.compare_with_value(existing_value.as_ref()) {
if !cmp.compare_value(existing_value.as_ref()) {
succeeded = false;
break;
}
Expand Down

0 comments on commit 9aa2182

Please sign in to comment.