From 9cc016f7e807edee08cfcdac8bc97699ef097667 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 1 Sep 2023 12:39:55 +0800 Subject: [PATCH] feat: impl kv backend for raft engine (#2280) * feat: kv backend on raft-engine * feat: raft-engine kvbackend * fix: toml * fix: some review comments * chore: optimize delete * fix: lift lock in batch_delete --- Cargo.lock | 1 + src/common/meta/src/kv_backend/memory.rs | 20 +- src/log-store/Cargo.toml | 1 + src/log-store/src/lib.rs | 2 + src/log-store/src/raft_engine.rs | 1 + src/log-store/src/raft_engine/backend.rs | 574 +++++++++++++++++++++ src/log-store/src/raft_engine/log_store.rs | 2 +- 7 files changed, 587 insertions(+), 14 deletions(-) create mode 100644 src/log-store/src/raft_engine/backend.rs diff --git a/Cargo.lock b/Cargo.lock index e5a5b385a8f7..49a7676dd0e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4900,6 +4900,7 @@ dependencies = [ "bytes", "common-base", "common-error", + "common-meta", "common-runtime", "common-telemetry", "common-test-util", diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 02560add57bf..347c84a5876e 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -318,9 +318,8 @@ impl TxnService for MemoryKvBackend { let do_txn = |txn_op| match txn_op { TxnOp::Put(key, value) => { - let prev_value = kvs.insert(key.clone(), value); - let prev_kv = prev_value.map(|value| KeyValue { key, value }); - TxnOpResponse::ResponsePut(PutResponse { prev_kv }) + kvs.insert(key.clone(), value); + TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }) } TxnOp::Get(key) => { @@ -337,16 +336,11 @@ impl TxnService for MemoryKvBackend { TxnOp::Delete(key) => { let prev_value = kvs.remove(&key); - let deleted = prev_value.as_ref().map(|x| x.len()).unwrap_or(0) as i64; - - let prev_kvs = prev_value - .into_iter() - .map(|value| KeyValue { - key: key.clone(), - value, - }) - .collect(); - TxnOpResponse::ResponseDelete(DeleteRangeResponse { deleted, prev_kvs }) + let deleted = if prev_value.is_some() { 1 } else { 0 }; + TxnOpResponse::ResponseDelete(DeleteRangeResponse { + deleted, + prev_kvs: vec![], + }) } }; diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index f4825db2fde8..56e4dfd4d73b 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -17,6 +17,7 @@ byteorder = "1.4" bytes = "1.1" common-base = { workspace = true } common-error = { workspace = true } +common-meta = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } futures-util.workspace = true diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index af7e47ee4c93..7f2bbfc3c555 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + mod config; pub mod error; mod noop; diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index b053c491baf1..24bb26cfc688 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -20,6 +20,7 @@ use store_api::logstore::namespace::Namespace; use crate::error::Error; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; +mod backend; pub mod log_store; pub mod protos { diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs new file mode 100644 index 000000000000..090f9fc55f3c --- /dev/null +++ b/src/log-store/src/raft_engine/backend.rs @@ -0,0 +1,574 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! [KvBackend] implementation based on [raft_engine::Engine]. + +use std::any::Any; +use std::sync::RwLock; + +use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse}; +use common_meta::kv_backend::{KvBackend, TxnService}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; +use common_meta::rpc::KeyValue; +use raft_engine::{Engine, LogBatch}; +use snafu::ResultExt; + +use crate::error; +use crate::error::RaftEngineSnafu; + +pub(crate) const SYSTEM_NAMESPACE: u64 = 0; + +/// RaftEngine based [KvBackend] implementation. +pub struct RaftEngineBackend { + engine: RwLock, +} + +#[async_trait::async_trait] +impl TxnService for RaftEngineBackend { + type Error = error::Error; + + async fn txn(&self, txn: Txn) -> Result { + let TxnRequest { + compare, + success, + failure, + } = txn.into(); + + let mut succeeded = true; + let engine = self.engine.write().unwrap(); + for cmp in compare { + let existing_value = engine_get(&engine, &cmp.key)?.map(|kv| kv.value); + if !cmp.compare_with_value(existing_value.as_ref()) { + succeeded = false; + break; + } + } + + let mut batch = LogBatch::default(); + let do_txn = |txn_op| match txn_op { + TxnOp::Put(key, value) => { + batch + .put(SYSTEM_NAMESPACE, key.clone(), value) + .context(RaftEngineSnafu)?; + Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None })) + } + + TxnOp::Get(key) => { + let value = engine_get(&engine, &key)?.map(|kv| kv.value); + let kvs = value + .into_iter() + .map(|value| KeyValue { + key: key.clone(), + value, + }) + .collect(); + Ok(TxnOpResponse::ResponseGet(RangeResponse { + kvs, + more: false, + })) + } + + TxnOp::Delete(key) => { + let prev = engine_get(&engine, &key)?; + batch.delete(SYSTEM_NAMESPACE, key); + let deleted = if prev.is_some() { 1 } else { 0 }; + Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse { + deleted, + prev_kvs: vec![], + })) + } + }; + + let responses = if succeeded { success } else { failure } + .into_iter() + .map(do_txn) + .collect::>()?; + + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + + Ok(TxnResponse { + succeeded, + responses, + }) + } +} + +#[async_trait::async_trait] +impl KvBackend for RaftEngineBackend { + fn name(&self) -> &str { + "RaftEngineBackend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + let mut res = vec![]; + self.engine + .read() + .unwrap() + .scan_raw_messages( + SYSTEM_NAMESPACE, + Some(&req.key), + Some(&req.range_end), + false, + |key, value| { + res.push(KeyValue { + key: key.to_vec(), + value: value.to_vec(), + }); + true + }, + ) + .context(RaftEngineSnafu)?; + Ok(RangeResponse { + kvs: res, + more: false, + }) + } + + async fn put(&self, req: PutRequest) -> Result { + let PutRequest { + key, + value, + prev_kv, + } = req; + + let mut prev = None; + // Engine::write assures that one batch is written atomically. The read/write lock is + // just to prevent race condition between put and txn. + let engine = self.engine.read().unwrap(); + if prev_kv { + prev = engine_get(&engine, &key)?; + } + engine_put(&engine, key, value)?; + Ok(PutResponse { prev_kv: prev }) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let BatchPutRequest { kvs, prev_kv } = req; + let mut batch = LogBatch::with_capacity(kvs.len()); + + let mut prev_kvs = if prev_kv { + Vec::with_capacity(kvs.len()) + } else { + vec![] + }; + + let engine = self.engine.read().unwrap(); + for kv in kvs { + if prev_kv && let Some(kv) = engine_get(&engine, &kv.key)? { + prev_kvs.push(kv); + } + batch + .put(SYSTEM_NAMESPACE, kv.key, kv.value) + .context(RaftEngineSnafu)?; + } + + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + + Ok(BatchPutResponse { prev_kvs }) + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let mut response = BatchGetResponse { + kvs: Vec::with_capacity(req.keys.len()), + }; + let engine = self.engine.read().unwrap(); + for key in req.keys { + let Some(value) = engine.get(SYSTEM_NAMESPACE, &key) else { + continue; + }; + response.kvs.push(KeyValue { key, value }); + } + Ok(response) + } + + async fn compare_and_put( + &self, + req: CompareAndPutRequest, + ) -> Result { + let CompareAndPutRequest { key, expect, value } = req; + + let mut batch = LogBatch::with_capacity(1); + let engine = self.engine.write().unwrap(); + let existing = engine_get(&engine, &key)?; + let eq = existing + .as_ref() + .map(|kv| kv.value == expect) + .unwrap_or_else(|| { + // if the associated value of key does not exist and expect is empty, + // then we still consider them as equal. + expect.is_empty() + }); + + if eq { + batch + .put(SYSTEM_NAMESPACE, key, value) + .context(RaftEngineSnafu)?; + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + } + Ok(CompareAndPutResponse { + success: eq, + prev_kv: existing, + }) + } + + async fn delete_range( + &self, + req: DeleteRangeRequest, + ) -> Result { + let DeleteRangeRequest { + key, + range_end, + prev_kv, + } = req; + + let range = RangeRequest { + key, + range_end, + limit: 0, + keys_only: true, + }; + let range_resp = self.range(range).await?; + + let mut prev_kvs = vec![]; + let mut deleted = 0; + + let engine = self.engine.read().unwrap(); + for kv in range_resp.kvs { + engine_delete(&engine, &kv.key)?; + if prev_kv { + prev_kvs.push(kv); + } + deleted += 1; + } + + Ok(DeleteRangeResponse { deleted, prev_kvs }) + } + + async fn batch_delete( + &self, + req: BatchDeleteRequest, + ) -> Result { + let BatchDeleteRequest { keys, prev_kv } = req; + + let mut prev_kvs = if prev_kv { + Vec::with_capacity(keys.len()) + } else { + vec![] + }; + let mut batch = LogBatch::with_capacity(keys.len()); + let engine = self.engine.read().unwrap(); + for key in keys { + if prev_kv && let Some(prev) = engine_get(&engine, &key)? { + prev_kvs.push(prev); + } + batch.delete(SYSTEM_NAMESPACE, key); + } + let engine = self.engine.read().unwrap(); + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + Ok(BatchDeleteResponse { prev_kvs }) + } + + async fn move_value(&self, _req: MoveValueRequest) -> Result { + unimplemented!() + } + + async fn get(&self, key: &[u8]) -> Result, Self::Error> { + engine_get(&self.engine.read().unwrap(), key) + } + + async fn exists(&self, key: &[u8]) -> Result { + Ok(engine_get(&self.engine.read().unwrap(), key)?.is_some()) + } + + async fn delete(&self, key: &[u8], prev_kv: bool) -> Result, Self::Error> { + let engine = self.engine.read().unwrap(); + let prev = if prev_kv { + engine_get(&engine, key)? + } else { + None + }; + engine_delete(&engine, key)?; + Ok(prev) + } +} + +fn engine_get(engine: &Engine, key: &[u8]) -> error::Result> { + let res = engine.get(SYSTEM_NAMESPACE, key); + Ok(res.map(|value| KeyValue { + key: key.to_vec(), + value, + })) +} + +fn engine_put(engine: &Engine, key: Vec, value: Vec) -> error::Result<()> { + let mut batch = LogBatch::with_capacity(1); + batch + .put(SYSTEM_NAMESPACE, key, value) + .context(RaftEngineSnafu)?; + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + Ok(()) +} + +fn engine_delete(engine: &Engine, key: &[u8]) -> error::Result<()> { + let mut batch = LogBatch::with_capacity(1); + batch.delete(SYSTEM_NAMESPACE, key.to_vec()); + engine.write(&mut batch, false).context(RaftEngineSnafu)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use common_test_util::temp_dir::create_temp_dir; + use raft_engine::{Config, ReadableSize, RecoveryMode}; + + use super::*; + + fn build_kv_backend(dir: String) -> RaftEngineBackend { + let config = Config { + dir, + spill_dir: None, + recovery_mode: RecoveryMode::AbsoluteConsistency, + target_file_size: ReadableSize::mb(4), + purge_threshold: ReadableSize::mb(16), + ..Default::default() + }; + let engine = RwLock::new(Engine::open(config).unwrap()); + RaftEngineBackend { engine } + } + + #[tokio::test] + async fn test_raft_engine_kv() { + let dir = create_temp_dir("raft-engine-kv"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + assert!(backend.get(b"hello").await.unwrap().is_none()); + + let response = backend + .put(PutRequest { + key: b"hello".to_vec(), + value: b"world".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + assert!(response.prev_kv.is_none()); + assert_eq!( + b"world".as_slice(), + &backend.get(b"hello").await.unwrap().unwrap().value + ); + } + + #[tokio::test] + async fn test_compare_and_put() { + let dir = create_temp_dir("compare_and_put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + + let key = b"hello".to_vec(); + backend + .put(PutRequest { + key: key.clone(), + value: b"word".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + + let CompareAndPutResponse { success, prev_kv } = backend + .compare_and_put(CompareAndPutRequest { + key: key.clone(), + expect: b"world".to_vec(), + value: b"whatever".to_vec(), + }) + .await + .unwrap(); + assert!(!success); + assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value); + + let CompareAndPutResponse { success, prev_kv } = backend + .compare_and_put(CompareAndPutRequest { + key: key.clone(), + expect: b"word".to_vec(), + value: b"world".to_vec(), + }) + .await + .unwrap(); + assert!(success); + assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value); + + assert_eq!( + b"world".as_slice(), + &backend.get(b"hello").await.unwrap().unwrap().value + ); + } + + fn build_batch_key_values(start: usize, end: usize) -> Vec { + (start..end) + .map(|idx| { + let bytes = idx.to_ne_bytes().to_vec(); + KeyValue { + key: bytes.clone(), + value: bytes, + } + }) + .collect() + } + + #[tokio::test] + async fn test_compare_and_put_empty() { + let dir = create_temp_dir("compare_and_put_empty"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + let CompareAndPutResponse { success, prev_kv } = backend + .compare_and_put(CompareAndPutRequest { + key: b"hello".to_vec(), + expect: vec![], + value: b"world".to_vec(), + }) + .await + .unwrap(); + assert!(success); + assert!(prev_kv.is_none()); + + assert_eq!( + b"world".as_slice(), + &backend.get(b"hello").await.unwrap().unwrap().value + ); + } + + #[tokio::test] + async fn test_batch_put_and_scan_delete() { + let dir = create_temp_dir("compare_and_put"); + let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); + + // put 0..10 + let BatchPutResponse { prev_kvs } = backend + .batch_put(BatchPutRequest { + kvs: build_batch_key_values(0, 10), + prev_kv: false, + }) + .await + .unwrap(); + assert_eq!(0, prev_kvs.len()); + + let BatchPutResponse { prev_kvs } = backend + .batch_put(BatchPutRequest { + kvs: build_batch_key_values(5, 15), + prev_kv: true, + }) + .await + .unwrap(); + let prev_kvs = prev_kvs + .into_iter() + .map(|kv| kv.key) + .collect::>(); + assert_eq!( + build_batch_key_values(5, 10) + .into_iter() + .map(|kv| kv.key) + .collect::>(), + prev_kvs + ); + + // range 2..10 + let RangeResponse { kvs, more } = backend + .range(RangeRequest { + key: 2usize.to_ne_bytes().to_vec(), + range_end: 10usize.to_ne_bytes().to_vec(), + limit: 0, + keys_only: false, + }) + .await + .unwrap(); + assert!(!more); + assert_eq!( + build_batch_key_values(2, 10) + .into_iter() + .map(|kv| kv.key) + .collect::>(), + kvs.into_iter().map(|kv| kv.key).collect::>() + ); + + //raneg 0..1000 + let RangeResponse { kvs, more } = backend + .range(RangeRequest { + key: 0usize.to_ne_bytes().to_vec(), + range_end: 1000usize.to_ne_bytes().to_vec(), + limit: 0, + keys_only: false, + }) + .await + .unwrap(); + assert!(!more); + assert_eq!( + build_batch_key_values(0, 15) + .into_iter() + .map(|kv| kv.key) + .collect::>(), + kvs.into_iter().map(|kv| kv.key).collect::>() + ); + + // then delete 3..7 + let BatchDeleteResponse { prev_kvs } = backend + .batch_delete(BatchDeleteRequest { + keys: build_batch_key_values(3, 7) + .into_iter() + .map(|kv| kv.key) + .collect(), + prev_kv: true, + }) + .await + .unwrap(); + assert_eq!( + build_batch_key_values(3, 7) + .into_iter() + .map(|kv| kv.key) + .collect::>(), + prev_kvs + .into_iter() + .map(|kv| kv.key) + .collect::>() + ); + + // finally assert existing keys to be 0..3 ∪ 7..15 + let RangeResponse { kvs, more } = backend + .range(RangeRequest { + key: 0usize.to_ne_bytes().to_vec(), + range_end: 1000usize.to_ne_bytes().to_vec(), + limit: 0, + keys_only: false, + }) + .await + .unwrap(); + assert!(!more); + + let keys = kvs.into_iter().map(|kv| kv.key).collect::>(); + assert_eq!( + build_batch_key_values(0, 3) + .into_iter() + .chain(build_batch_key_values(7, 15)) + .map(|kv| kv.key) + .collect::>(), + keys + ); + } +} diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index fa01c6f25622..68a060124946 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -31,10 +31,10 @@ use crate::error::{ AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; +use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; const NAMESPACE_PREFIX: &str = "$sys/"; -const SYSTEM_NAMESPACE: u64 = 0; pub struct RaftEngineLogStore { config: LogConfig,