diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index c0459f68cef4..d924b5b7f757 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod memory; +pub mod test; pub mod txn; use std::any::Any; diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 347c84a5876e..dd434fe85017 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -17,7 +17,6 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; use std::marker::PhantomData; -use std::ops::Range; use std::sync::RwLock; use async_trait::async_trait; @@ -85,21 +84,25 @@ impl KvBackend for MemoryKvBackend { } async fn range(&self, req: RangeRequest) -> Result { + let range = req.range(); let RangeRequest { - key, - range_end, - limit, - keys_only, + limit, keys_only, .. } = req; let kvs = self.kvs.read().unwrap(); + let values = kvs.range(range); - let iter: Box, &Vec)>> = if range_end.is_empty() { - Box::new(kvs.get_key_value(&key).into_iter()) - } else { - Box::new(kvs.range(key..range_end)) - }; - let mut kvs = iter + let mut more = false; + let mut iter: i64 = 0; + + let kvs = values + .take_while(|_| { + let take = limit == 0 || iter != limit; + iter += 1; + more = limit > 0 && iter > limit; + + take + }) .map(|(k, v)| { let key = k.clone(); let value = if keys_only { vec![] } else { v.clone() }; @@ -107,13 +110,6 @@ impl KvBackend for MemoryKvBackend { }) .collect::>(); - let more = if limit > 0 && kvs.len() > limit as usize { - kvs.truncate(limit as usize); - true - } else { - false - }; - Ok(RangeResponse { kvs, more }) } @@ -215,36 +211,32 @@ impl KvBackend for MemoryKvBackend { &self, req: DeleteRangeRequest, ) -> Result { - let DeleteRangeRequest { - key, - range_end, - prev_kv, - } = req; + let range = req.range(); + let DeleteRangeRequest { prev_kv, .. } = req; let mut kvs = self.kvs.write().unwrap(); - let prev_kvs = if range_end.is_empty() { - kvs.remove(&key) - .into_iter() - .map(|value| KeyValue { - key: key.clone(), - value, - }) - .collect::>() + let keys = kvs + .range(range) + .map(|(key, _)| key.clone()) + .collect::>(); + + let mut prev_kvs = if prev_kv { + Vec::with_capacity(keys.len()) } else { - let range = Range { - start: key, - end: range_end, - }; - kvs.extract_if(|key, _| range.contains(key)) - .map(Into::into) - .collect::>() + vec![] }; + let deleted = keys.len() as i64; - Ok(DeleteRangeResponse { - deleted: prev_kvs.len() as i64, - prev_kvs: if prev_kv { prev_kvs } else { vec![] }, - }) + for key in keys { + if let Some(value) = kvs.remove(&key) { + if prev_kv { + prev_kvs.push((key.clone(), value).into()) + } + } + } + + Ok(DeleteRangeResponse { deleted, prev_kvs }) } async fn batch_delete( @@ -358,254 +350,63 @@ impl TxnService for MemoryKvBackend { #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use super::*; use crate::error::Error; + use crate::kv_backend::test::{ + prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + }; use crate::kv_backend::KvBackend; - use crate::rpc::store::{BatchGetRequest, BatchPutRequest}; - use crate::rpc::KeyValue; - use crate::util; async fn mock_mem_store_with_data() -> MemoryKvBackend { let kv_store = MemoryKvBackend::::new(); - let kvs = mock_kvs(); - - assert!(kv_store - .batch_put(BatchPutRequest { - kvs, - ..Default::default() - }) - .await - .is_ok()); - - assert!(kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val11".to_vec(), - ..Default::default() - }) - .await - .is_ok()); + prepare_kv(&kv_store).await; kv_store } - fn mock_kvs() -> Vec { - vec![ - KeyValue { - key: b"key1".to_vec(), - value: b"val1".to_vec(), - }, - KeyValue { - key: b"key2".to_vec(), - value: b"val2".to_vec(), - }, - KeyValue { - key: b"key3".to_vec(), - value: b"val3".to_vec(), - }, - ] - } - #[tokio::test] async fn test_put() { let kv_store = mock_mem_store_with_data().await; - let resp = kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val12".to_vec(), - prev_kv: false, - }) - .await - .unwrap(); - assert!(resp.prev_kv.is_none()); - - let resp = kv_store - .put(PutRequest { - key: b"key11".to_vec(), - value: b"val13".to_vec(), - prev_kv: true, - }) - .await - .unwrap(); - let prev_kv = resp.prev_kv.unwrap(); - assert_eq!(b"key11", prev_kv.key()); - assert_eq!(b"val12", prev_kv.value()); + test_kv_put(kv_store).await; } #[tokio::test] async fn test_range() { let kv_store = mock_mem_store_with_data().await; - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); + test_kv_range(kv_store).await; + } - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - range_end: range_end.clone(), - limit: 0, - keys_only: false, - }) - .await - .unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); - assert_eq!(b"val11", resp.kvs[1].value()); - - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - range_end: range_end.clone(), - limit: 0, - keys_only: true, - }) - .await - .unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"", resp.kvs[0].value()); - assert_eq!(b"key11", resp.kvs[1].key()); - assert_eq!(b"", resp.kvs[1].value()); - - let resp = kv_store - .range(RangeRequest { - key: key.clone(), - limit: 0, - keys_only: false, - ..Default::default() - }) - .await - .unwrap(); - - assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - - let resp = kv_store - .range(RangeRequest { - key, - range_end, - limit: 1, - keys_only: false, - }) - .await - .unwrap(); + #[tokio::test] + async fn test_range_2() { + let kv = MemoryKvBackend::::new(); - assert_eq!(1, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); + test_kv_range_2(kv).await; } #[tokio::test] async fn test_batch_get() { let kv_store = mock_mem_store_with_data().await; - let keys = vec![]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert!(resp.kvs.is_empty()); - - let keys = vec![b"key10".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert!(resp.kvs.is_empty()); - - let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); - - assert_eq!(2, resp.kvs.len()); - assert_eq!(b"key1", resp.kvs[0].key()); - assert_eq!(b"val1", resp.kvs[0].value()); - assert_eq!(b"key3", resp.kvs[1].key()); - assert_eq!(b"val3", resp.kvs[1].value()); + test_kv_batch_get(kv_store).await; } #[tokio::test(flavor = "multi_thread")] async fn test_compare_and_put() { let kv_store = Arc::new(MemoryKvBackend::::new()); - let success = Arc::new(AtomicU8::new(0)); - - let mut joins = vec![]; - for _ in 0..20 { - let kv_store_clone = kv_store.clone(); - let success_clone = success.clone(); - let join = tokio::spawn(async move { - let req = CompareAndPutRequest { - key: b"key".to_vec(), - expect: vec![], - value: b"val_new".to_vec(), - }; - let resp = kv_store_clone.compare_and_put(req).await.unwrap(); - if resp.success { - success_clone.fetch_add(1, Ordering::SeqCst); - } - }); - joins.push(join); - } - - for join in joins { - join.await.unwrap(); - } - assert_eq!(1, success.load(Ordering::SeqCst)); + test_kv_compare_and_put(kv_store).await; } #[tokio::test] async fn test_delete_range() { let kv_store = mock_mem_store_with_data().await; - let req = DeleteRangeRequest { - key: b"key3".to_vec(), - range_end: vec![], - prev_kv: true, - }; - - let resp = kv_store.delete_range(req).await.unwrap(); - assert_eq!(1, resp.prev_kvs.len()); - assert_eq!(b"key3", resp.prev_kvs[0].key()); - assert_eq!(b"val3", resp.prev_kvs[0].value()); - - let resp = kv_store.get(b"key3").await.unwrap(); - assert!(resp.is_none()); - - let req = DeleteRangeRequest { - key: b"key2".to_vec(), - range_end: vec![], - prev_kv: false, - }; - - let resp = kv_store.delete_range(req).await.unwrap(); - assert!(resp.prev_kvs.is_empty()); - - let resp = kv_store.get(b"key2").await.unwrap(); - assert!(resp.is_none()); - - let key = b"key1".to_vec(); - let range_end = util::get_prefix_end_key(b"key1"); - - let req = DeleteRangeRequest { - key: key.clone(), - range_end: range_end.clone(), - prev_kv: true, - }; - let resp = kv_store.delete_range(req).await.unwrap(); - assert_eq!(2, resp.prev_kvs.len()); - - let req = RangeRequest { - key, - range_end, - ..Default::default() - }; - let resp = kv_store.range(req).await.unwrap(); - assert!(resp.kvs.is_empty()); + test_kv_delete_range(kv_store).await; } #[tokio::test] @@ -636,35 +437,6 @@ mod tests { async fn test_batch_delete() { let kv_store = mock_mem_store_with_data().await; - assert!(kv_store.get(b"key1").await.unwrap().is_some()); - assert!(kv_store.get(b"key100").await.unwrap().is_none()); - - let req = BatchDeleteRequest { - keys: vec![b"key1".to_vec(), b"key100".to_vec()], - prev_kv: true, - }; - let resp = kv_store.batch_delete(req).await.unwrap(); - assert_eq!(1, resp.prev_kvs.len()); - assert_eq!( - vec![KeyValue { - key: b"key1".to_vec(), - value: b"val1".to_vec() - }], - resp.prev_kvs - ); - assert!(kv_store.get(b"key1").await.unwrap().is_none()); - - assert!(kv_store.get(b"key2").await.unwrap().is_some()); - assert!(kv_store.get(b"key3").await.unwrap().is_some()); - - let req = BatchDeleteRequest { - keys: vec![b"key2".to_vec(), b"key3".to_vec()], - prev_kv: false, - }; - let resp = kv_store.batch_delete(req).await.unwrap(); - assert!(resp.prev_kvs.is_empty()); - - assert!(kv_store.get(b"key2").await.unwrap().is_none()); - assert!(kv_store.get(b"key3").await.unwrap().is_none()); + test_kv_batch_delete(kv_store).await; } } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs new file mode 100644 index 000000000000..4f8911910072 --- /dev/null +++ b/src/common/meta/src/kv_backend/test.rs @@ -0,0 +1,352 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; + +use super::{KvBackend, *}; +use crate::error::Error; +use crate::rpc::store::{BatchGetRequest, PutRequest}; +use crate::rpc::KeyValue; +use crate::util; + +pub fn mock_kvs() -> Vec { + vec![ + KeyValue { + key: b"key1".to_vec(), + value: b"val1".to_vec(), + }, + KeyValue { + key: b"key2".to_vec(), + value: b"val2".to_vec(), + }, + KeyValue { + key: b"key3".to_vec(), + value: b"val3".to_vec(), + }, + ] +} + +pub async fn prepare_kv(kv_store: &impl KvBackend) { + let kvs = mock_kvs(); + assert!(kv_store + .batch_put(BatchPutRequest { + kvs, + ..Default::default() + }) + .await + .is_ok()); + + assert!(kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val11".to_vec(), + ..Default::default() + }) + .await + .is_ok()); +} + +pub async fn test_kv_put(kv_store: impl KvBackend) { + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val12".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + assert!(resp.prev_kv.is_none()); + + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val13".to_vec(), + prev_kv: true, + }) + .await + .unwrap(); + let prev_kv = resp.prev_kv.unwrap(); + assert_eq!(b"key11", prev_kv.key()); + assert_eq!(b"val12", prev_kv.value()); +} + +pub async fn test_kv_range(kv_store: impl KvBackend) { + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: false, + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(b"val11", resp.kvs[1].value()); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: true, + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"", resp.kvs[0].value()); + assert_eq!(b"key11", resp.kvs[1].key()); + assert_eq!(b"", resp.kvs[1].value()); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + limit: 0, + keys_only: false, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + + let resp = kv_store + .range(RangeRequest { + key, + range_end, + limit: 1, + keys_only: false, + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); +} + +pub async fn test_kv_range_2(kv_store: impl KvBackend) { + kv_store + .put(PutRequest::new().with_key("atest").with_value("value")) + .await + .unwrap(); + + kv_store + .put(PutRequest::new().with_key("test").with_value("value")) + .await + .unwrap(); + + // If both key and range_end are ‘\0’, then range represents all keys. + let result = kv_store + .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 2); + assert!(!result.more); + + // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. + let result = kv_store + .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 2); + + let result = kv_store + .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) + .await + .unwrap(); + + assert_eq!(result.kvs.len(), 1); + assert_eq!(result.kvs[0].key, b"test"); + + // Fetches the keys >= "a", set limit to 1, the `more` should be true. + let result = kv_store + .range( + RangeRequest::new() + .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_limit(1), + ) + .await + .unwrap(); + assert_eq!(result.kvs.len(), 1); + assert!(result.more); + + // Fetches the keys >= "a", set limit to 2, the `more` should be false. + let result = kv_store + .range( + RangeRequest::new() + .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_limit(2), + ) + .await + .unwrap(); + assert_eq!(result.kvs.len(), 2); + assert!(!result.more); + + // Fetches the keys >= "a", set limit to 3, the `more` should be false. + let result = kv_store + .range( + RangeRequest::new() + .with_range(b"a".to_vec(), b"\0".to_vec()) + .with_limit(3), + ) + .await + .unwrap(); + assert_eq!(result.kvs.len(), 2); + assert!(!result.more); +} + +pub async fn test_kv_batch_get(kv_store: impl KvBackend) { + let keys = vec![]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert!(resp.kvs.is_empty()); + + let keys = vec![b"key10".to_vec()]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert!(resp.kvs.is_empty()); + + let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; + let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1", resp.kvs[0].key()); + assert_eq!(b"val1", resp.kvs[0].value()); + assert_eq!(b"key3", resp.kvs[1].key()); + assert_eq!(b"val3", resp.kvs[1].value()); +} + +pub async fn test_kv_compare_and_put(kv_store: Arc>) { + let success = Arc::new(AtomicU8::new(0)); + + let mut joins = vec![]; + for _ in 0..20 { + let kv_store_clone = kv_store.clone(); + let success_clone = success.clone(); + let join = tokio::spawn(async move { + let req = CompareAndPutRequest { + key: b"key".to_vec(), + expect: vec![], + value: b"val_new".to_vec(), + }; + let resp = kv_store_clone.compare_and_put(req).await.unwrap(); + if resp.success { + success_clone.fetch_add(1, Ordering::SeqCst); + } + }); + joins.push(join); + } + + for join in joins { + join.await.unwrap(); + } + + assert_eq!(1, success.load(Ordering::SeqCst)); +} + +pub async fn test_kv_delete_range(kv_store: impl KvBackend) { + let req = DeleteRangeRequest { + key: b"key3".to_vec(), + range_end: vec![], + prev_kv: true, + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(1, resp.prev_kvs.len()); + assert_eq!(1, resp.deleted); + assert_eq!(b"key3", resp.prev_kvs[0].key()); + assert_eq!(b"val3", resp.prev_kvs[0].value()); + + let resp = kv_store.get(b"key3").await.unwrap(); + assert!(resp.is_none()); + + let req = DeleteRangeRequest { + key: b"key2".to_vec(), + range_end: vec![], + prev_kv: false, + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(1, resp.deleted); + assert!(resp.prev_kvs.is_empty()); + + let resp = kv_store.get(b"key2").await.unwrap(); + assert!(resp.is_none()); + + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let req = DeleteRangeRequest { + key: key.clone(), + range_end: range_end.clone(), + prev_kv: true, + }; + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(2, resp.prev_kvs.len()); + + let req = RangeRequest { + key, + range_end, + ..Default::default() + }; + let resp = kv_store.range(req).await.unwrap(); + assert!(resp.kvs.is_empty()); +} + +pub async fn test_kv_batch_delete(kv_store: impl KvBackend) { + assert!(kv_store.get(b"key1").await.unwrap().is_some()); + assert!(kv_store.get(b"key100").await.unwrap().is_none()); + + let req = BatchDeleteRequest { + keys: vec![b"key1".to_vec(), b"key100".to_vec()], + prev_kv: true, + }; + let resp = kv_store.batch_delete(req).await.unwrap(); + assert_eq!(1, resp.prev_kvs.len()); + assert_eq!( + vec![KeyValue { + key: b"key1".to_vec(), + value: b"val1".to_vec() + }], + resp.prev_kvs + ); + assert!(kv_store.get(b"key1").await.unwrap().is_none()); + + assert!(kv_store.get(b"key2").await.unwrap().is_some()); + assert!(kv_store.get(b"key3").await.unwrap().is_some()); + + let req = BatchDeleteRequest { + keys: vec![b"key2".to_vec(), b"key3".to_vec()], + prev_kv: false, + }; + let resp = kv_store.batch_delete(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); + + assert!(kv_store.get(b"key2").await.unwrap().is_none()); + assert!(kv_store.get(b"key3").await.unwrap().is_none()); +} diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index 2426442e3f5d..b307894337f2 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::{Display, Formatter}; +use std::ops::Bound; use api::v1::meta::{ BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse, @@ -30,6 +31,17 @@ use crate::error; use crate::error::Result; use crate::rpc::{util, KeyValue}; +pub fn to_range(key: Vec, range_end: Vec) -> (Bound>, Bound>) { + match (&key[..], &range_end[..]) { + (_, []) => (Bound::Included(key.clone()), Bound::Included(key)), + // If both key and range_end are ‘\0’, then range represents all keys. + ([0], [0]) => (Bound::Unbounded, Bound::Unbounded), + // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. + (_, [0]) => (Bound::Included(key), Bound::Unbounded), + (_, _) => (Bound::Included(key), Bound::Excluded(range_end)), + } +} + #[derive(Debug, Clone, Default)] pub struct RangeRequest { /// key is the first key for the range, If range_end is not given, the @@ -96,6 +108,11 @@ impl RangeRequest { } } + /// Returns the `RangeBounds`. + pub fn range(&self) -> (Bound>, Bound>) { + to_range(self.key.clone(), self.range_end.clone()) + } + /// key is the first key for the range, If range_end is not given, the /// request only looks up key. #[inline] @@ -690,6 +707,11 @@ impl DeleteRangeRequest { } } + /// Returns the `RangeBounds`. + pub fn range(&self) -> (Bound>, Bound>) { + to_range(self.key.clone(), self.range_end.clone()) + } + /// key is the first key to delete in the range. If range_end is not given, /// the range is defined to contain only the key argument. #[inline] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index bedf1fc64f65..7e9a275c7e4f 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -19,7 +19,7 @@ common-base = { workspace = true } common-config = { workspace = true } common-error = { workspace = true } common-macro = { workspace = true } -common-meta = { workspace = true } +common-meta = { workspace = true, features = ["testing"] } common-runtime = { workspace = true } common-telemetry = { workspace = true } futures-util.workspace = true diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 31a002a9529b..8b27b99b1464 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -15,6 +15,7 @@ //! [KvBackend] implementation based on [raft_engine::Engine]. use std::any::Any; +use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::RwLock; use common_error::ext::BoxedError; @@ -28,6 +29,7 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; +use common_meta::util::get_next_prefix_key; use raft_engine::{Config, Engine, LogBatch}; use snafu::ResultExt; @@ -137,29 +139,48 @@ impl KvBackend for RaftEngineBackend { async fn range(&self, req: RangeRequest) -> Result { let mut res = vec![]; + let (start, end) = req.range(); + let RangeRequest { + keys_only, limit, .. + } = req; + + let (start_key, end_key) = match (start, end) { + (Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))), + (Unbounded, Unbounded) => (None, None), + (Included(start), Excluded(end)) => (Some(start), Some(end)), + (Included(start), Unbounded) => (Some(start), None), + _ => unreachable!(), + }; + let mut more = false; + let mut iter = 0; + self.engine .read() .unwrap() .scan_raw_messages( SYSTEM_NAMESPACE, - Some(&req.key), - Some(&req.range_end), + start_key.as_deref(), + end_key.as_deref(), false, |key, value| { - res.push(KeyValue { - key: key.to_vec(), - value: value.to_vec(), - }); - true + let take = limit == 0 || iter != limit; + iter += 1; + more = limit > 0 && iter > limit; + + if take { + res.push(KeyValue { + key: key.to_vec(), + value: if keys_only { vec![] } else { value.to_vec() }, + }); + } + + take }, ) .context(RaftEngineSnafu) .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - Ok(RangeResponse { - kvs: res, - more: false, - }) + Ok(RangeResponse { kvs: res, more }) } async fn put(&self, req: PutRequest) -> Result { @@ -275,7 +296,7 @@ impl KvBackend for RaftEngineBackend { key, range_end, limit: 0, - keys_only: true, + keys_only: false, }; let range_resp = self.range(range).await?; @@ -383,7 +404,12 @@ fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; + use common_meta::kv_backend::test::{ + prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, + test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, + }; use common_test_util::temp_dir::create_temp_dir; use raft_engine::{Config, ReadableSize, RecoveryMode}; @@ -615,4 +641,66 @@ mod tests { keys ); } + + #[tokio::test] + async fn test_range() { + let dir = create_temp_dir("range"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_range(backend).await; + } + + #[tokio::test] + async fn test_range_2() { + let dir = create_temp_dir("range2"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + + test_kv_range_2(backend).await; + } + + #[tokio::test] + async fn test_put() { + let dir = create_temp_dir("put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_put(backend).await; + } + + #[tokio::test] + async fn test_batch_get() { + let dir = create_temp_dir("batch_get"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_batch_get(backend).await; + } + + #[tokio::test] + async fn test_batch_delete() { + let dir = create_temp_dir("batch_delete"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_batch_delete(backend).await; + } + + #[tokio::test] + async fn test_delete_range() { + let dir = create_temp_dir("delete_range"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_delete_range(backend).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put_2() { + let dir = create_temp_dir("compare_and_put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + prepare_kv(&backend).await; + + test_kv_compare_and_put(Arc::new(backend)).await; + } }