Skip to content

Commit

Permalink
feat: usr txn to impl cas
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed May 14, 2024
1 parent 00e21e2 commit e1731cc
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 184 deletions.
12 changes: 2 additions & 10 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,8 @@ mod tests {
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest,
RangeResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
Expand Down Expand Up @@ -519,13 +518,6 @@ mod tests {
unimplemented!()
}

async fn compare_and_put(
&self,
_req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
unimplemented!()
}

async fn delete_range(
&self,
_req: DeleteRangeRequest,
Expand Down
33 changes: 28 additions & 5 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::ext::ErrorExt;
pub use txn::TxnService;

use crate::error::Error;
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
Expand Down Expand Up @@ -52,11 +53,6 @@ where

async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error>;

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand All @@ -80,6 +76,33 @@ where
})
}

/// CAS: Compares the value at the key with the given value, and if they are
/// equal, puts the new value at the key.
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;
let txn = if expect.is_empty() {
Txn::put_if_not_exists(key, value)
} else {
Txn::compare_and_put(key, expect, value)
};
let txn_res = self.txn(txn).await?;

let success = txn_res.succeeded;
// The response is guaranteed to have at most one element.
let op_res = txn_res.responses.into_iter().next();
let prev_kv = match op_res {
Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
None => None,
};

Ok(CompareAndPutResponse { success, prev_kv })
}

/// Puts a value at a key. If `if_not_exists` is `true`, the operation
/// ensures the key does not exist before applying the PUT operation.
/// Otherwise, it simply applies the PUT operation without checking for
Expand Down
94 changes: 4 additions & 90 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use std::any::Any;
use std::sync::Arc;

use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
TxnResponse,
Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};

use super::KvBackendRef;
use crate::error::{self, Error, Result};
Expand All @@ -28,8 +27,8 @@ use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

Expand Down Expand Up @@ -202,53 +201,6 @@ impl KvBackend for EtcdStore {
Ok(BatchGetResponse { kvs })
}

async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPut {
key,
expect,
value,
put_options,
} = req.try_into()?;

let compare = if expect.is_empty() {
// create if absent
// revision 0 means key was not exist
Compare::create_revision(key.clone(), CompareOp::Equal, 0)
} else {
// compare and put
Compare::value(key.clone(), CompareOp::Equal, expect)
};
let put = TxnOp::put(key.clone(), value, put_options);
let get = TxnOp::get(key, None);
let txn = Txn::new()
.when(vec![compare])
.and_then(vec![put])
.or_else(vec![get]);

let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;

let success = txn_res.succeeded();
let op_res = txn_res
.op_responses()
.pop()
.context(error::InvalidTxnResultSnafu {
err_msg: "empty response",
})?;

let prev_kv = match op_res {
TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value),
TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value),
_ => unreachable!(),
};

Ok(CompareAndPutResponse { success, prev_kv })
}

async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let Delete { key, options } = req.try_into()?;

Expand Down Expand Up @@ -461,28 +413,6 @@ impl TryFrom<BatchDeleteRequest> for BatchDelete {
}
}

struct CompareAndPut {
key: Vec<u8>,
expect: Vec<u8>,
value: Vec<u8>,
put_options: Option<PutOptions>,
}

impl TryFrom<CompareAndPutRequest> for CompareAndPut {
type Error = Error;

fn try_from(req: CompareAndPutRequest) -> Result<Self> {
let CompareAndPutRequest { key, expect, value } = req;

Ok(CompareAndPut {
key,
expect,
value,
put_options: Some(PutOptions::default().with_prev_key()),
})
}
}

struct Delete {
key: Vec<u8>,
options: Option<DeleteOptions>,
Expand Down Expand Up @@ -597,22 +527,6 @@ mod tests {
let _ = batch_delete.options.unwrap();
}

#[test]
fn test_parse_compare_and_put() {
let req = CompareAndPutRequest {
key: b"test_key".to_vec(),
expect: b"test_expect".to_vec(),
value: b"test_value".to_vec(),
};

let compare_and_put: CompareAndPut = req.try_into().unwrap();

assert_eq!(b"test_key".to_vec(), compare_and_put.key);
assert_eq!(b"test_expect".to_vec(), compare_and_put.expect);
assert_eq!(b"test_value".to_vec(), compare_and_put.value);
let _ = compare_and_put.put_options.unwrap();
}

#[test]
fn test_parse_delete() {
let req = DeleteRangeRequest {
Expand Down
40 changes: 2 additions & 38 deletions src/common/meta/src/kv_backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
Expand All @@ -29,8 +28,8 @@ use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

Expand Down Expand Up @@ -190,41 +189,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
Ok(BatchGetResponse { kvs })
}

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;

let mut kvs = self.kvs.write().unwrap();

let existed = kvs.entry(key);
let (success, prev_kv) = match existed {
Entry::Vacant(e) => {
let expected = expect.is_empty();
if expected {
let _ = e.insert(value);
}
(expected, None)
}
Entry::Occupied(mut existed) => {
let expected = existed.get() == &expect;
let prev_kv = if expected {
let _ = existed.insert(value);
None
} else {
Some(KeyValue {
key: existed.key().clone(),
value: existed.get().clone(),
})
};
(expected, prev_kv)
}
};

Ok(CompareAndPutResponse { success, prev_kv })
}

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ impl Txn {
}

/// Builds a transaction that puts a value at a key if the key exists and the value
/// is equal to `old_value`.
pub fn compare_and_put(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Self {
/// is equal to `expect`.
pub fn compare_and_put(key: Vec<u8>, expect: Vec<u8>, value: Vec<u8>) -> Self {
Self::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
old_value,
expect,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
Expand Down
41 changes: 3 additions & 38 deletions src/log-store/src/raft_engine/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnRes
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
Expand Down Expand Up @@ -277,42 +277,6 @@ impl KvBackend for RaftEngineBackend {
Ok(response)
}

async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;

let mut batch = LogBatch::with_capacity(1);
let engine = self.engine.write().unwrap();
let existing = engine_get(&engine, &key)?;
let eq = existing
.as_ref()
.map(|kv| kv.value == expect)
.unwrap_or_else(|| {
// if the associated value of key does not exist and expect is empty,
// then we still consider them as equal.
expect.is_empty()
});

if eq {
batch
.put(SYSTEM_NAMESPACE, key, value)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
}
Ok(CompareAndPutResponse {
success: eq,
prev_kv: existing,
})
}

async fn delete_range(
&self,
req: DeleteRangeRequest,
Expand Down Expand Up @@ -436,6 +400,7 @@ mod tests {
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};

Expand Down

0 comments on commit e1731cc

Please sign in to comment.