diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index c0459f68cef4..d924b5b7f757 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod memory; +pub mod test; pub mod txn; use std::any::Any; diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index df297b2c302b..f07ac929fb93 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -92,7 +92,17 @@ impl KvBackend for MemoryKvBackend { let kvs = self.kvs.read().unwrap(); let iter = kvs.range(range); - let mut kvs = iter + let mut more = false; + let mut counter = 0usize; + + let kvs = iter + .take_while(|_| { + let take = counter != limit as usize || limit == 0; + counter += 1; + more = counter > limit as usize && limit != 0; + + take + }) .map(|(k, v)| { let key = k.clone(); let value = if keys_only { vec![] } else { v.clone() }; @@ -100,13 +110,6 @@ impl KvBackend for MemoryKvBackend { }) .collect::>(); - let more = if limit > 0 && kvs.len() > limit as usize { - kvs.truncate(limit as usize); - true - } else { - false - }; - Ok(RangeResponse { kvs, more }) } @@ -218,11 +221,17 @@ impl KvBackend for MemoryKvBackend { .map(|(key, _)| key.clone()) .collect::>(); - let mut prev_kvs = Vec::with_capacity(keys.len()); + let mut prev_kvs = if prev_kv { + Vec::with_capacity(keys.len()) + } else { + vec![] + }; for key in keys { if let Some(value) = kvs.remove(&key) { - prev_kvs.push((key.clone(), value).into()) + if prev_kv { + prev_kvs.push((key.clone(), value).into()) + } } } @@ -343,291 +352,63 @@ impl TxnService for MemoryKvBackend { #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use super::*; use crate::error::Error; + use crate::kv_backend::test::{ + prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + }; use crate::kv_backend::KvBackend; - use crate::rpc::store::{BatchGetRequest, BatchPutRequest}; - use crate::rpc::KeyValue; - use crate::util; async fn mock_mem_store_with_data() -> MemoryKvBackend { let kv_store = MemoryKvBackend::::new(); - let kvs = mock_kvs(); - - assert!(kv_store - .batch_put(BatchPutRequest { - kvs, - ..Default::default() - }) - .await - .is_ok()); - - assert!(kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val11".to_vec(), - ..Default::default() - }) - .await - .is_ok()); + prepare_kv(&kv_store).await; kv_store } - fn mock_kvs() -> Vec { - vec![ - KeyValue { - key: b"key1".to_vec(), - value: b"val1".to_vec(), - }, - KeyValue { - key: b"key2".to_vec(), - value: b"val2".to_vec(), - }, - KeyValue { - key: b"key3".to_vec(), - value: b"val3".to_vec(), - }, - ] - } - #[tokio::test] async fn test_put() { let kv_store = mock_mem_store_with_data().await; - let resp = kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val12".to_vec(), - prev_kv: false, - }) - .await - .unwrap(); - assert!(resp.prev_kv.is_none()); - - let resp = kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val13".to_vec(), - prev_kv: true, - }) - .await - .unwrap(); - let prev_kv = resp.prev_kv.unwrap(); - assert_eq!(b"key11", prev_kv.key()); - assert_eq!(b"val12", prev_kv.value()); + test_kv_put(kv_store).await; } #[tokio::test] async fn test_range() { let kv_store = mock_mem_store_with_data().await; - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); - - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - range_end: range_end.clone(), - limit: 0, - keys_only: false, - }) - .await - .unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); - assert_eq!(b"val11", resp.kvs[1].value()); - - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - range_end: range_end.clone(), - limit: 0, - keys_only: true, - }) - .await - .unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); - assert_eq!(b"", resp.kvs[1].value()); - - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - limit: 0, - keys_only: false, - ..Default::default() - }) - .await - .unwrap(); - - assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - - let resp = kv_store - .range(RangeRequest { - key, - range_end, - limit: 1, - keys_only: false, - }) - .await - .unwrap(); - - assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); + test_kv_range(kv_store).await; } #[tokio::test] async fn test_range_2() { let kv = MemoryKvBackend::::new(); - kv.put(PutRequest::new().with_key("atest").with_value("value")) - .await - .unwrap(); - - kv.put(PutRequest::new().with_key("test").with_value("value")) - .await - .unwrap(); - - // If both key and range_end are ‘\0’, then range represents all keys. - let result = kv - .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) - .await - .unwrap(); - - assert_eq!(result.kvs.len(), 2); - - // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. - let result = kv - .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) - .await - .unwrap(); - - assert_eq!(result.kvs.len(), 2); - - let result = kv - .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) - .await - .unwrap(); - - assert_eq!(result.kvs.len(), 1); - assert_eq!(result.kvs[0].key, b"test"); + test_kv_range_2(kv).await; } #[tokio::test] async fn test_batch_get() { let kv_store = mock_mem_store_with_data().await; - let keys = vec![]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert!(resp.kvs.is_empty()); - - let keys = vec![b"key10".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert!(resp.kvs.is_empty()); - - let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key3", resp.kvs[1].key()); - assert_eq!(b"val3", resp.kvs[1].value()); + test_kv_batch_get(kv_store).await; } #[tokio::test(flavor = "multi_thread")] async fn test_compare_and_put() { let kv_store = Arc::new(MemoryKvBackend::::new()); - let success = Arc::new(AtomicU8::new(0)); - - let mut joins = vec![]; - for _ in 0..20 { - let kv_store_clone = kv_store.clone(); - let success_clone = success.clone(); - let join = tokio::spawn(async move { - let req = CompareAndPutRequest { - key: b"key".to_vec(), - expect: vec![], - value: b"val_new".to_vec(), - }; - let resp = kv_store_clone.compare_and_put(req).await.unwrap(); - if resp.success { - success_clone.fetch_add(1, Ordering::SeqCst); - } - }); - joins.push(join); - } - for join in joins { - join.await.unwrap(); - } - - assert_eq!(1, success.load(Ordering::SeqCst)); + test_kv_compare_and_put(kv_store).await; } #[tokio::test] async fn test_delete_range() { let kv_store = mock_mem_store_with_data().await; - let req = DeleteRangeRequest { - key: b"key3".to_vec(), - range_end: vec![], - prev_kv: true, - }; - - let resp = kv_store.delete_range(req).await.unwrap(); - assert_eq!(1, resp.prev_kvs.len()); - assert_eq!(b"key3", resp.prev_kvs[0].key()); - assert_eq!(b"val3", resp.prev_kvs[0].value()); - - let resp = kv_store.get(b"key3").await.unwrap(); - assert!(resp.is_none()); - - let req = DeleteRangeRequest { - key: b"key2".to_vec(), - range_end: vec![], - prev_kv: false, - }; - - let resp = kv_store.delete_range(req).await.unwrap(); - assert!(resp.prev_kvs.is_empty()); - - let resp = kv_store.get(b"key2").await.unwrap(); - assert!(resp.is_none()); - - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); - - let req = DeleteRangeRequest { - key: key.clone(), - range_end: range_end.clone(), - prev_kv: true, - }; - let resp = kv_store.delete_range(req).await.unwrap(); - assert_eq!(2, resp.prev_kvs.len()); - - let req = RangeRequest { - key, - range_end, - ..Default::default() - }; - let resp = kv_store.range(req).await.unwrap(); - assert!(resp.kvs.is_empty()); + test_kv_delete_range(kv_store).await; } #[tokio::test] @@ -658,35 +439,6 @@ mod tests { async fn test_batch_delete() { let kv_store = mock_mem_store_with_data().await; - assert!(kv_store.get(b"key1").await.unwrap().is_some()); - assert!(kv_store.get(b"key100").await.unwrap().is_none()); - - let req = BatchDeleteRequest { - keys: vec![b"key1".to_vec(), b"key100".to_vec()], - prev_kv: true, - }; - let resp = kv_store.batch_delete(req).await.unwrap(); - assert_eq!(1, resp.prev_kvs.len()); - assert_eq!( - vec![KeyValue { - key: b"key1".to_vec(), - value: b"val1".to_vec() - }], - resp.prev_kvs - ); - assert!(kv_store.get(b"key1").await.unwrap().is_none()); - - assert!(kv_store.get(b"key2").await.unwrap().is_some()); - assert!(kv_store.get(b"key3").await.unwrap().is_some()); - - let req = BatchDeleteRequest { - keys: vec![b"key2".to_vec(), b"key3".to_vec()], - prev_kv: false, - }; - let resp = kv_store.batch_delete(req).await.unwrap(); - assert!(resp.prev_kvs.is_empty()); - - assert!(kv_store.get(b"key2").await.unwrap().is_none()); - assert!(kv_store.get(b"key3").await.unwrap().is_none()); + test_kv_batch_delete(kv_store).await; } } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs new file mode 100644 index 000000000000..28f725431b7d --- /dev/null +++ b/src/common/meta/src/kv_backend/test.rs @@ -0,0 +1,324 @@ +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; + +use super::{KvBackend, *}; +use crate::error::Error; +use crate::rpc::store::{BatchGetRequest, PutRequest}; +use crate::rpc::KeyValue; +use crate::util; + +pub fn mock_kvs() -> Vec { + vec![ + KeyValue { + key: b"key1".to_vec(), + value: b"val1".to_vec(), + }, + KeyValue { + key: b"key2".to_vec(), + value: b"val2".to_vec(), + }, + KeyValue { + key: b"key3".to_vec(), + value: b"val3".to_vec(), + }, + ] +} + +pub async fn prepare_kv(kv_store: &impl KvBackend) { + let kvs = mock_kvs(); + assert!(kv_store + .batch_put(BatchPutRequest { + kvs, + ..Default::default() + }) + .await + .is_ok()); + + assert!(kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val11".to_vec(), + ..Default::default() + }) + .await + .is_ok()); +} + +pub async fn test_kv_put(kv_store: impl KvBackend) { + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val12".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + assert!(resp.prev_kv.is_none()); + + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val13".to_vec(), + prev_kv: true, + }) + .await + .unwrap(); + let prev_kv = resp.prev_kv.unwrap(); + assert_eq!(b"key11", prev_kv.key()); + assert_eq!(b"val12", prev_kv.value()); +} + +pub async fn test_kv_range(kv_store: impl KvBackend) { + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: false, + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(b"val11", resp.kvs[1].value()); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: true, + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"", resp.kvs[0].value()); + assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(b"", resp.kvs[1].value()); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + limit: 0, + keys_only: false, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + + let resp = kv_store + .range(RangeRequest { + key, + range_end, + limit: 1, + keys_only: false, + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); +} + +pub async fn test_kv_range_2(kv_store: impl KvBackend) { + kv_store + .put(PutRequest::new().with_key("atest").with_value("value")) + .await + .unwrap(); + + kv_store + .put(PutRequest::new().with_key("test").with_value("value")) + .await + .unwrap(); + + // If both key and range_end are ‘\0’, then range represents all keys. + let result = kv_store + .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 2); + assert!(!result.more); + + // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. + let result = kv_store + .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 2); + + let result = kv_store + .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 1); + assert_eq!(result.kvs[0].key, b"test"); + + // Fetches the keys >= "a", set limit to 1, the `more` should be true. + let result = kv_store + .range( + RangeRequest::new() + .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_limit(1), + ) + .await + .unwrap(); + assert_eq!(result.kvs.len(), 1); + assert!(result.more); + + // Fetches the keys >= "a", set limit to 3, the `more` should be false. + let result = kv_store + .range( + RangeRequest::new() + .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_limit(3), + ) + .await + .unwrap(); + assert_eq!(result.kvs.len(), 2); + assert!(!result.more); +} + +pub async fn test_kv_batch_get(kv_store: impl KvBackend) { + let keys = vec![]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert!(resp.kvs.is_empty()); + + let keys = vec![b"key10".to_vec()]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert!(resp.kvs.is_empty()); + + let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + assert_eq!(b"key3", resp.kvs[1].key()); + assert_eq!(b"val3", resp.kvs[1].value()); +} + +pub async fn test_kv_compare_and_put(kv_store: Arc>) { + let success = Arc::new(AtomicU8::new(0)); + + let mut joins = vec![]; + for _ in 0..20 { + let kv_store_clone = kv_store.clone(); + let success_clone = success.clone(); + let join = tokio::spawn(async move { + let req = CompareAndPutRequest { + key: b"key".to_vec(), + expect: vec![], + value: b"val_new".to_vec(), + }; + let resp = kv_store_clone.compare_and_put(req).await.unwrap(); + if resp.success { + success_clone.fetch_add(1, Ordering::SeqCst); + } + }); + joins.push(join); + } + + for join in joins { + join.await.unwrap(); + } + + assert_eq!(1, success.load(Ordering::SeqCst)); +} + +pub async fn test_kv_delete_range(kv_store: impl KvBackend) { + let req = DeleteRangeRequest { + key: b"key3".to_vec(), + range_end: vec![], + prev_kv: true, + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(1, resp.prev_kvs.len()); + assert_eq!(b"key3", resp.prev_kvs[0].key()); + assert_eq!(b"val3", resp.prev_kvs[0].value()); + + let resp = kv_store.get(b"key3").await.unwrap(); + assert!(resp.is_none()); + + let req = DeleteRangeRequest { + key: b"key2".to_vec(), + range_end: vec![], + prev_kv: false, + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); + + let resp = kv_store.get(b"key2").await.unwrap(); + assert!(resp.is_none()); + + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let req = DeleteRangeRequest { + key: key.clone(), + range_end: range_end.clone(), + prev_kv: true, + }; + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(2, resp.prev_kvs.len()); + + let req = RangeRequest { + key, + range_end, + ..Default::default() + }; + let resp = kv_store.range(req).await.unwrap(); + assert!(resp.kvs.is_empty()); +} + +pub async fn test_kv_batch_delete(kv_store: impl KvBackend) { + assert!(kv_store.get(b"key1").await.unwrap().is_some()); + assert!(kv_store.get(b"key100").await.unwrap().is_none()); + + let req = BatchDeleteRequest { + keys: vec![b"key1".to_vec(), b"key100".to_vec()], + prev_kv: true, + }; + let resp = kv_store.batch_delete(req).await.unwrap(); + assert_eq!(1, resp.prev_kvs.len()); + assert_eq!( + vec![KeyValue { + key: b"key1".to_vec(), + value: b"val1".to_vec() + }], + resp.prev_kvs + ); + assert!(kv_store.get(b"key1").await.unwrap().is_none()); + + assert!(kv_store.get(b"key2").await.unwrap().is_some()); + assert!(kv_store.get(b"key3").await.unwrap().is_some()); + + let req = BatchDeleteRequest { + keys: vec![b"key2".to_vec(), b"key3".to_vec()], + prev_kv: false, + }; + let resp = kv_store.batch_delete(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); + + assert!(kv_store.get(b"key2").await.unwrap().is_none()); + assert!(kv_store.get(b"key3").await.unwrap().is_none()); +} diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index e76bd37a2e1e..2ff076b87089 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -33,7 +33,7 @@ use crate::rpc::{util, KeyValue}; pub fn to_range(key: Vec, range_end: Vec) -> (Bound>, Bound>) { if range_end.is_empty() { - (Bound::Included(key.clone()), Bound::Included(key)) + (Bound::Included(key.clone()), Bound::Included(key.clone())) } else if range_end.len() == 1 && range_end[0] == 0 { // If both key and range_end are ‘\0’, then range represents all keys. if key.len() == 1 && key[0] == 0 { diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index bedf1fc64f65..7e9a275c7e4f 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -19,7 +19,7 @@ common-base = { workspace = true } common-config = { workspace = true } common-error = { workspace = true } common-macro = { workspace = true } -common-meta = { workspace = true } +common-meta = { workspace = true, features = ["testing"] } common-runtime = { workspace = true } common-telemetry = { workspace = true } futures-util.workspace = true diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 73b13836c85f..187bce847b42 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -15,6 +15,7 @@ //! [KvBackend] implementation based on [raft_engine::Engine]. use std::any::Any; +use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::RwLock; use common_error::ext::BoxedError; @@ -28,6 +29,7 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; +use common_meta::util::get_next_prefix_key; use raft_engine::{Config, Engine, LogBatch}; use snafu::ResultExt; @@ -138,18 +140,18 @@ impl KvBackend for RaftEngineBackend { async fn range(&self, req: RangeRequest) -> Result { let mut res = vec![]; let (start, end) = req.range(); - let RangeRequest { limit, .. } = req; - - let start_key = match start { - std::ops::Bound::Included(key) => Some(key), - std::ops::Bound::Excluded(_) => unreachable!(), - std::ops::Bound::Unbounded => None, - }; + let RangeRequest { + keys_only, limit, .. + } = req; - let end_key = match end { - std::ops::Bound::Included(_) => unreachable!(), - std::ops::Bound::Excluded(key) => Some(key), - std::ops::Bound::Unbounded => None, + let (start_key, end_key) = match (start, end) { + (Included(start), Included(_)) => { + (Some(start.clone()), Some(get_next_prefix_key(&start))) + } + (Unbounded, Unbounded) => (None, None), + (Included(start), Excluded(end)) => (Some(start), Some(end)), + (Included(start), Unbounded) => (Some(start), None), + _ => unreachable!(), }; let mut more = false; @@ -164,7 +166,7 @@ impl KvBackend for RaftEngineBackend { |key, value| { res.push(KeyValue { key: key.to_vec(), - value: value.to_vec(), + value: if keys_only { vec![] } else { value.to_vec() }, }); if limit > 0 && limit as usize == res.len() { more = true; @@ -294,7 +296,7 @@ impl KvBackend for RaftEngineBackend { key, range_end, limit: 0, - keys_only: true, + keys_only: false, }; let range_resp = self.range(range).await?; @@ -402,7 +404,12 @@ fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; + use common_meta::kv_backend::test::{ + prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + }; use common_test_util::temp_dir::create_temp_dir; use raft_engine::{Config, ReadableSize, RecoveryMode}; @@ -634,4 +641,66 @@ mod tests { keys ); } + + #[tokio::test] + async fn test_range() { + let dir = create_temp_dir("range"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_range(backend).await; + } + + #[tokio::test] + async fn test_range_2() { + let dir = create_temp_dir("range2"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + + test_kv_range_2(backend).await; + } + + #[tokio::test] + async fn test_put() { + let dir = create_temp_dir("put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_put(backend).await; + } + + #[tokio::test] + async fn test_batch_get() { + let dir = create_temp_dir("batch_get"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_batch_get(backend).await; + } + + #[tokio::test] + async fn test_batch_delete() { + let dir = create_temp_dir("batch_delete"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_batch_delete(backend).await; + } + + #[tokio::test] + async fn test_delete_range() { + let dir = create_temp_dir("delete_range"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_delete_range(backend).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put_2() { + let dir = create_temp_dir("compare_and_put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_compare_and_put(Arc::new(backend)).await; + } }