Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify the KvBackend #2684

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::table_name::TableName;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::KvBackendAdapter;
use rand::Rng;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};

Expand Down Expand Up @@ -64,9 +63,7 @@ impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();

let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
etcd_store,
)));
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));

let tool = BenchTableMetadata {
table_metadata_manager,
Expand Down
14 changes: 7 additions & 7 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::router::TableRoute;
use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
Expand All @@ -35,8 +37,6 @@ use common_meta::util::get_prefix_end_key;
use common_telemetry::info;
use etcd_client::Client;
use futures::TryStreamExt;
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
use prost::Message;
use snafu::ResultExt;
use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};
Expand Down Expand Up @@ -81,7 +81,7 @@ impl UpgradeCommand {
}

struct MigrateTableMetadata {
etcd_store: KvStoreRef,
etcd_store: KvBackendRef,
dryrun: bool,

skip_table_global_keys: bool,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl MigrateTableMetadata {
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));

let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -182,7 +182,7 @@ impl MigrateTableMetadata {
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl MigrateTableMetadata {
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -284,7 +284,7 @@ impl MigrateTableMetadata {

info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end.clone()),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_config::KvStoreConfig;
use common_config::KvBackendConfig;
use common_telemetry::logging::LoggingOptions;
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -30,7 +30,7 @@ pub const ENV_LIST_SEP: &str = ",";
pub struct MixOptions {
pub data_home: String,
pub procedure: ProcedureConfig,
pub metadata_store: KvStoreConfig,
pub metadata_store: KvBackendConfig,
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
Expand Down
29 changes: 16 additions & 13 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{metadata_store_dir, KvStoreConfig, WalConfig};
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct StandaloneOptions {
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub storage: StorageConfig,
pub metadata_store: KvStoreConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
pub logging: LoggingOptions,
pub user_provider: Option<String>,
Expand All @@ -119,7 +119,7 @@ impl Default for StandaloneOptions {
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvStoreConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
user_provider: None,
Expand Down Expand Up @@ -336,23 +336,26 @@ impl StartCommand {
})?;

let metadata_dir = metadata_store_dir(&opts.data_home);
let (kv_store, procedure_manager) = FeInstance::try_build_standalone_components(
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store,
opts.procedure,
)
.await
.context(StartFrontendSnafu)?;

let datanode =
DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(
dn_opts.clone(),
Some(kv_backend.clone()),
Default::default(),
)
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();

let catalog_manager = KvBackendCatalogManager::new(
kv_store.clone(),
kv_backend.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
);
Expand All @@ -366,7 +369,7 @@ impl StartCommand {
// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
fe_plugins,
kv_store,
kv_backend,
procedure_manager.clone(),
catalog_manager,
region_server,
Expand All @@ -389,13 +392,13 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Plugins,
kv_store: KvBackendRef,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let frontend_instance = FeInstance::try_new_standalone(
kv_store,
kv_backend,
procedure_manager,
catalog_manager,
plugins,
Expand Down
4 changes: 2 additions & 2 deletions src/common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ pub fn metadata_store_dir(store_dir: &str) -> String {

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvStoreConfig {
pub struct KvBackendConfig {
// Kv file size in bytes
pub file_size: ReadableSize,
// Kv purge threshold in bytes
pub purge_threshold: ReadableSize,
}

impl Default for KvStoreConfig {
impl Default for KvBackendConfig {
fn default() -> Self {
Self {
// log file size 256MB
Expand Down
30 changes: 27 additions & 3 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ use crate::peer::Peer;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Empty key is not allowed"))]
EmptyKey { location: Location },

#[snafu(display("Invalid result with a txn response: {}", err_msg))]
InvalidTxnResult { err_msg: String, location: Location },

#[snafu(display("Failed to connect to Etcd"))]
ConnectEtcd {
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to execute via Etcd"))]
EtcdFailed {
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence { err_msg: String, location: Location },

Expand Down Expand Up @@ -254,7 +274,10 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal,
IllegalServerState { .. }
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| ConnectEtcd { .. } => StatusCode::Internal,

SerdeJson { .. }
| ParseOption { .. }
Expand All @@ -267,7 +290,8 @@ impl ErrorExt for Error {
| NextSequence { .. }
| SequenceOutOfRange { .. }
| UnexpectedSequenceValue { .. }
| InvalidHeartbeatResponse { .. } => StatusCode::Unexpected,
| InvalidHeartbeatResponse { .. }
| InvalidTxnResult { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand All @@ -277,7 +301,7 @@ impl ErrorExt for Error {
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,

PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments,

TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Expand Down
10 changes: 10 additions & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod etcd;
pub mod memory;
pub mod test;
pub mod txn;
Expand Down Expand Up @@ -127,3 +128,12 @@ where
}
}
}

pub trait ResettableKvBackend: KvBackend
where
Self::Error: ErrorExt,
{
fn reset(&self);
}

pub type ResettableKvBackendRef = Arc<dyn ResettableKvBackend<Error = Error> + Send + Sync>;
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,47 @@
use std::any::Any;
use std::sync::Arc;

use common_meta::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::metrics::METRIC_META_TXN_REQUEST;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
TxnResponse,
};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error;
use crate::error::{ConvertEtcdTxnObjectSnafu, Error, Result};
use crate::service::store::etcd_util::KvPair;
use crate::service::store::kv::KvStoreRef;
use super::KvBackendRef;
use crate::error::{self, Error, Result};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

pub struct KvPair<'a>(&'a etcd_client::KeyValue);

impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
pub fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}

#[inline]
pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}

impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}

// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
Expand All @@ -46,7 +68,7 @@ pub struct EtcdStore {
}

impl EtcdStore {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvStoreRef>
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvBackendRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -58,7 +80,7 @@ impl EtcdStore {
Ok(Self::with_etcd_client(client))
}

pub fn with_etcd_client(client: Client) -> KvStoreRef {
pub fn with_etcd_client(client: Client) -> KvBackendRef {
Arc::new(Self { client })
}

Expand Down Expand Up @@ -305,7 +327,7 @@ impl TxnService for EtcdStore {
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_res.try_into().context(ConvertEtcdTxnObjectSnafu)
txn_res.try_into()
}
}

Expand Down
Loading