From 473198023a6f540aa809be9382eb771cec92c6eb Mon Sep 17 00:00:00 2001 From: jiachun Date: Tue, 12 Sep 2023 20:43:24 +0800 Subject: [PATCH 1/5] feat: atomic creating metadata --- src/common/meta/src/key.rs | 12 +- src/common/meta/src/key/catalog_name.rs | 25 +-- src/common/meta/src/key/kv_backend_helper.rs | 25 +++ src/common/meta/src/key/schema_name.rs | 23 +-- .../src/req_convert/delete/table_to_region.rs | 4 +- .../src/req_convert/insert/table_to_region.rs | 4 +- src/frontend/src/statement/ddl.rs | 6 +- src/meta-srv/src/lib.rs | 2 - src/meta-srv/src/metadata_service.rs | 153 ------------------ 9 files changed, 65 insertions(+), 189 deletions(-) create mode 100644 src/common/meta/src/key/kv_backend_helper.rs delete mode 100644 src/meta-srv/src/metadata_service.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6425cb6e3454..4717144d7f67 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -52,6 +52,7 @@ pub mod table_name; #[allow(deprecated)] pub mod table_region; // TODO(weny): removes it. +mod kv_backend_helper; #[allow(deprecated)] pub mod table_route; @@ -168,13 +169,12 @@ impl TableMetadataManager { pub async fn init(&self) -> Result<()> { let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME); - if !self.catalog_manager().exist(catalog_name).await? { - self.catalog_manager().create(catalog_name).await?; - } let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); - if !self.schema_manager().exist(schema_name).await? { - self.schema_manager().create(schema_name, None).await?; - } + + self.catalog_manager().create(catalog_name, true).await?; + self.schema_manager() + .create(schema_name, None, true) + .await?; Ok(()) } diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 6debb5af9356..20688fe36f79 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -24,10 +24,12 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; -use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; +use crate::key::{ + kv_backend_helper, TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX, +}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; #[derive(Debug, Clone, Copy, PartialEq)] @@ -103,15 +105,16 @@ impl CatalogManager { } /// Creates `CatalogNameKey`. - pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> { - let raw_key = catalog.as_raw_key(); + pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> { let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG); - let req = PutRequest::new() - .with_key(raw_key) - .with_value(CatalogNameValue.try_as_raw_value()?); - self.kv_backend.put(req).await?; - increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); + let raw_key = catalog.as_raw_key(); + let raw_value = CatalogNameValue.try_as_raw_value()?; + if kv_backend_helper::put_conditionally(&self.kv_backend, raw_key, raw_value, if_not_exists) + .await? + { + increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); + } Ok(()) } @@ -119,7 +122,7 @@ impl CatalogManager { pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result { let raw_key = catalog.as_raw_key(); - Ok(self.kv_backend.get(&raw_key).await?.is_some()) + self.kv_backend.exists(&raw_key).await } pub async fn catalog_names(&self) -> BoxStream<'static, Result> { @@ -159,7 +162,7 @@ mod tests { let catalog_key = CatalogNameKey::new("my-catalog"); - manager.create(catalog_key).await.unwrap(); + manager.create(catalog_key, false).await.unwrap(); assert!(manager.exist(catalog_key).await.unwrap()); diff --git a/src/common/meta/src/key/kv_backend_helper.rs b/src/common/meta/src/key/kv_backend_helper.rs new file mode 100644 index 000000000000..d9c605275019 --- /dev/null +++ b/src/common/meta/src/key/kv_backend_helper.rs @@ -0,0 +1,25 @@ +use crate::error::Result; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{CompareAndPutRequest, PutRequest}; + +pub(crate) async fn put_conditionally( + kv_backend: &KvBackendRef, + key: Vec, + value: Vec, + if_not_exists: bool, +) -> Result { + let success = if if_not_exists { + let req = CompareAndPutRequest::new() + .with_key(key) + .with_expect(vec![]) + .with_value(value); + let res = kv_backend.compare_and_put(req).await?; + res.success + } else { + let req = PutRequest::new().with_key(key).with_value(value); + kv_backend.put(req).await?; + true + }; + + Ok(success) +} diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index f923e5818e30..fd9aea895234 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -27,10 +27,12 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result}; -use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; +use crate::key::{ + kv_backend_helper, TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX, +}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; const OPT_KEY_TTL: &str = "ttl"; @@ -144,16 +146,17 @@ impl SchemaManager { &self, schema: SchemaNameKey<'_>, value: Option, + if_not_exists: bool, ) -> Result<()> { let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); let raw_key = schema.as_raw_key(); - let req = PutRequest::new() - .with_key(raw_key) - .with_value(value.unwrap_or_default().try_as_raw_value()?); - - self.kv_backend.put(req).await?; - increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); + let raw_value = value.unwrap_or_default().try_as_raw_value()?; + if kv_backend_helper::put_conditionally(&self.kv_backend, raw_key, raw_value, if_not_exists) + .await? + { + increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); + } Ok(()) } @@ -161,7 +164,7 @@ impl SchemaManager { pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result { let raw_key = schema.as_raw_key(); - Ok(self.kv_backend.get(&raw_key).await?.is_some()) + self.kv_backend.exists(&raw_key).await } pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result> { @@ -222,7 +225,7 @@ mod tests { async fn test_key_exist() { let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default())); let schema_key = SchemaNameKey::new("my-catalog", "my-schema"); - manager.create(schema_key, None).await.unwrap(); + manager.create(schema_key, None, false).await.unwrap(); assert!(manager.exist(schema_key).await.unwrap()); diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/frontend/src/req_convert/delete/table_to_region.rs index fd5e983458f6..791125caf884 100644 --- a/src/frontend/src/req_convert/delete/table_to_region.rs +++ b/src/frontend/src/req_convert/delete/table_to_region.rs @@ -74,11 +74,11 @@ mod tests { let schema_manager = SchemaManager::new(backend.clone()); catalog_manager - .create(CatalogNameKey::default()) + .create(CatalogNameKey::default(), false) .await .unwrap(); schema_manager - .create(SchemaNameKey::default(), None) + .create(SchemaNameKey::default(), None, false) .await .unwrap(); diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs index 3160a58d0396..6592557d6dce 100644 --- a/src/frontend/src/req_convert/insert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -74,11 +74,11 @@ mod tests { let schema_manager = SchemaManager::new(backend.clone()); catalog_manager - .create(CatalogNameKey::default()) + .create(CatalogNameKey::default(), false) .await .unwrap(); schema_manager - .create(SchemaNameKey::default(), None) + .create(SchemaNameKey::default(), None, false) .await .unwrap(); diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index ac949e77f32a..a9248f3dab9d 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -351,7 +351,7 @@ impl StatementExecutor { .schema_manager() .exist(schema_key) .await - .context(error::TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)?; if exists { return if create_if_not_exists { @@ -363,9 +363,9 @@ impl StatementExecutor { self.table_metadata_manager .schema_manager() - .create(schema_key, None) + .create(schema_key, None, false) .await - .context(error::TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)?; Ok(Output::AffectedRows(1)) } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index f2a49e7452a5..9bea19c48fce 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,8 +25,6 @@ pub mod handler; pub mod keys; pub mod lease; pub mod lock; - -pub mod metadata_service; pub mod metasrv; mod metrics; #[cfg(feature = "mock")] diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs deleted file mode 100644 index 8816abb8bc65..000000000000 --- a/src/meta-srv/src/metadata_service.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use common_meta::key::catalog_name::CatalogNameKey; -use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::TableMetadataManagerRef; -use common_telemetry::info; -use snafu::{ensure, ResultExt}; - -use crate::error; -use crate::error::Result; - -/// This trait defines some methods of metadata -#[async_trait] -pub trait MetadataService: Send + Sync { - // An error occurs if the schema exists and "if_not_exist" == false. - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()>; - - async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; -} - -pub type MetadataServiceRef = Arc; - -#[derive(Clone)] -pub struct DefaultMetadataService { - table_metadata_manager: TableMetadataManagerRef, -} - -impl DefaultMetadataService { - pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { - Self { - table_metadata_manager, - } - } -} - -#[async_trait] -impl MetadataService for DefaultMetadataService { - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()> { - self.table_metadata_manager - .catalog_manager() - .create(CatalogNameKey::new(catalog_name)) - .await - .context(error::TableMetadataManagerSnafu)?; - - info!("Successfully created a catalog: {}", catalog_name); - - let schema = SchemaNameKey::new(catalog_name, schema_name); - - let exist = self - .table_metadata_manager - .schema_manager() - .exist(schema) - .await - .context(error::TableMetadataManagerSnafu)?; - - ensure!( - !exist || if_not_exist, - error::SchemaAlreadyExistsSnafu { schema_name } - ); - - if !exist { - self.table_metadata_manager - .schema_manager() - .create(schema, None) - .await - .context(error::TableMetadataManagerSnafu)?; - - info!("Successfully created a schema: {}", schema_name); - } - - Ok(()) - } - - async fn delete_schema(&self, _catalog_name: &str, _schema_name: &str) -> Result<()> { - unimplemented!() - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_meta::key::catalog_name::CatalogNameKey; - use common_meta::key::schema_name::SchemaNameKey; - use common_meta::key::{TableMetaKey, TableMetadataManager}; - - use super::{DefaultMetadataService, MetadataService}; - use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; - use crate::service::store::memory::MemStore; - - #[tokio::test] - async fn test_create_schema() { - let kv_store = Arc::new(MemStore::default()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); - let service = DefaultMetadataService::new(table_metadata_manager); - - service - .create_schema("catalog", "public", false) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - - let result = service.create_schema("catalog", "public", false).await; - assert!(result.is_err()); - - service - .create_schema("catalog", "public", true) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - } - - async fn verify_result(kv_store: KvStoreRef) { - let key = CatalogNameKey::new("catalog").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - - let key = SchemaNameKey::new("catalog", "public").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - } -} From b4de0c1b486344479bf7a8d77af8d66215c81f47 Mon Sep 17 00:00:00 2001 From: jiachun Date: Tue, 12 Sep 2023 20:53:46 +0800 Subject: [PATCH 2/5] chore: exist exists --- src/catalog/src/information_schema/columns.rs | 2 +- src/catalog/src/information_schema/tables.rs | 2 +- src/catalog/src/lib.rs | 6 +++--- src/catalog/src/local/memory.rs | 6 +++--- src/common/meta/src/key/catalog_name.rs | 6 +++--- src/common/meta/src/key/schema_name.rs | 6 +++--- src/frontend/src/catalog.rs | 10 +++++----- src/frontend/src/instance.rs | 2 +- src/frontend/src/statement/ddl.rs | 2 +- 9 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 53b5efc0f396..57880993acfe 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -158,7 +158,7 @@ impl InformationSchemaColumnsBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager - .schema_exist(&catalog_name, &schema_name) + .schema_exists(&catalog_name, &schema_name) .await? { continue; diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 9047aa3e594e..a626dbfdd31a 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -154,7 +154,7 @@ impl InformationSchemaTablesBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager - .schema_exist(&catalog_name, &schema_name) + .schema_exists(&catalog_name, &schema_name) .await? { continue; diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 2d152f16f226..8f929a7dcc6c 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -47,11 +47,11 @@ pub trait CatalogManager: Send + Sync { async fn table_names(&self, catalog: &str, schema: &str) -> Result>; - async fn catalog_exist(&self, catalog: &str) -> Result; + async fn catalog_exists(&self, catalog: &str) -> Result; - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result; + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result; - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result; + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result; /// Returns the table by catalog, schema and table name. async fn table( diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 29efc75ab9ff..49f209c6f4bc 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -37,7 +37,7 @@ pub struct MemoryCatalogManager { #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { self.schema_exist_sync(catalog, schema) } @@ -59,11 +59,11 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } - async fn catalog_exist(&self, catalog: &str) -> Result { + async fn catalog_exists(&self, catalog: &str) -> Result { self.catalog_exist_sync(catalog) } - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { let catalogs = self.catalogs.read().unwrap(); Ok(catalogs .get(catalog) diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 20688fe36f79..88ce99255712 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -119,7 +119,7 @@ impl CatalogManager { Ok(()) } - pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result { + pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result { let raw_key = catalog.as_raw_key(); self.kv_backend.exists(&raw_key).await @@ -164,10 +164,10 @@ mod tests { manager.create(catalog_key, false).await.unwrap(); - assert!(manager.exist(catalog_key).await.unwrap()); + assert!(manager.exists(catalog_key).await.unwrap()); let wrong_catalog_key = CatalogNameKey::new("my-wrong"); - assert!(!manager.exist(wrong_catalog_key).await.unwrap()); + assert!(!manager.exists(wrong_catalog_key).await.unwrap()); } } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index fd9aea895234..5c6d8c7b28bc 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -161,7 +161,7 @@ impl SchemaManager { Ok(()) } - pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result { + pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result { let raw_key = schema.as_raw_key(); self.kv_backend.exists(&raw_key).await @@ -227,10 +227,10 @@ mod tests { let schema_key = SchemaNameKey::new("my-catalog", "my-schema"); manager.create(schema_key, None, false).await.unwrap(); - assert!(manager.exist(schema_key).await.unwrap()); + assert!(manager.exists(schema_key).await.unwrap()); let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong"); - assert!(!manager.exist(wrong_schema_key).await.unwrap()); + assert!(!manager.exists(wrong_schema_key).await.unwrap()); } } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 76d7ed218446..d72b137a183c 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -190,27 +190,27 @@ impl CatalogManager for FrontendCatalogManager { Ok(tables) } - async fn catalog_exist(&self, catalog: &str) -> CatalogResult { + async fn catalog_exists(&self, catalog: &str) -> CatalogResult { self.table_metadata_manager .catalog_manager() - .exist(CatalogNameKey::new(catalog)) + .exists(CatalogNameKey::new(catalog)) .await .context(TableMetadataManagerSnafu) } - async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { + async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult { if self.system_catalog.schema_exist(schema) { return Ok(true); } self.table_metadata_manager .schema_manager() - .exist(SchemaNameKey::new(catalog, schema)) + .exists(SchemaNameKey::new(catalog, schema)) .await .context(TableMetadataManagerSnafu) } - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { if self.system_catalog.table_exist(schema, table) { return Ok(true); } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index db88c1199359..cda343d2a099 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -538,7 +538,7 @@ impl SqlQueryHandler for Instance { async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager - .schema_exist(catalog, schema) + .schema_exists(catalog, schema) .await .context(error::CatalogSnafu) } diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index a9248f3dab9d..1caaf87b1404 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -349,7 +349,7 @@ impl StatementExecutor { let exists = self .table_metadata_manager .schema_manager() - .exist(schema_key) + .exists(schema_key) .await .context(TableMetadataManagerSnafu)?; From 3ee369b62ca6edebc5cf89075fb10ffa345b3fc1 Mon Sep 17 00:00:00 2001 From: jiachun Date: Tue, 12 Sep 2023 21:12:13 +0800 Subject: [PATCH 3/5] chore: license header --- src/common/meta/src/key.rs | 2 +- src/common/meta/src/key/kv_backend_helper.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 4717144d7f67..affacff97d16 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -49,10 +49,10 @@ pub mod schema_name; pub mod table_info; pub mod table_name; // TODO(weny): removes it. +mod kv_backend_helper; #[allow(deprecated)] pub mod table_region; // TODO(weny): removes it. -mod kv_backend_helper; #[allow(deprecated)] pub mod table_route; diff --git a/src/common/meta/src/key/kv_backend_helper.rs b/src/common/meta/src/key/kv_backend_helper.rs index d9c605275019..ed0e74368370 100644 --- a/src/common/meta/src/key/kv_backend_helper.rs +++ b/src/common/meta/src/key/kv_backend_helper.rs @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use crate::error::Result; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, PutRequest}; From c887663ded28ae05453993fdbeca3c54a1079f04 Mon Sep 17 00:00:00 2001 From: jiachun Date: Wed, 13 Sep 2023 11:15:00 +0800 Subject: [PATCH 4/5] chore: weny never say that --- src/common/meta/src/key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index affacff97d16..8e5fa28817d7 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -48,8 +48,8 @@ pub mod datanode_table; pub mod schema_name; pub mod table_info; pub mod table_name; -// TODO(weny): removes it. mod kv_backend_helper; +// TODO(weny): removes it. #[allow(deprecated)] pub mod table_region; // TODO(weny): removes it. From 757d48abf72d80c180fc1fb98d4fcb802ceccde0 Mon Sep 17 00:00:00 2001 From: jiachun Date: Wed, 13 Sep 2023 11:28:56 +0800 Subject: [PATCH 5/5] feat: add put_conditionally to kv_backend --- src/common/meta/src/key.rs | 1 - src/common/meta/src/key/catalog_name.rs | 8 ++-- src/common/meta/src/key/kv_backend_helper.rs | 39 -------------------- src/common/meta/src/key/schema_name.rs | 8 ++-- src/common/meta/src/kv_backend.rs | 26 +++++++++++++ 5 files changed, 34 insertions(+), 48 deletions(-) delete mode 100644 src/common/meta/src/key/kv_backend_helper.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 8e5fa28817d7..e2ebbe6c9310 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -48,7 +48,6 @@ pub mod datanode_table; pub mod schema_name; pub mod table_info; pub mod table_name; -mod kv_backend_helper; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_region; diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 88ce99255712..1041e0cd81db 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -24,9 +24,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; -use crate::key::{ - kv_backend_helper, TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX, -}; +use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -110,7 +108,9 @@ impl CatalogManager { let raw_key = catalog.as_raw_key(); let raw_value = CatalogNameValue.try_as_raw_value()?; - if kv_backend_helper::put_conditionally(&self.kv_backend, raw_key, raw_value, if_not_exists) + if self + .kv_backend + .put_conditionally(raw_key, raw_value, if_not_exists) .await? { increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); diff --git a/src/common/meta/src/key/kv_backend_helper.rs b/src/common/meta/src/key/kv_backend_helper.rs deleted file mode 100644 index ed0e74368370..000000000000 --- a/src/common/meta/src/key/kv_backend_helper.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::error::Result; -use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{CompareAndPutRequest, PutRequest}; - -pub(crate) async fn put_conditionally( - kv_backend: &KvBackendRef, - key: Vec, - value: Vec, - if_not_exists: bool, -) -> Result { - let success = if if_not_exists { - let req = CompareAndPutRequest::new() - .with_key(key) - .with_expect(vec![]) - .with_value(value); - let res = kv_backend.compare_and_put(req).await?; - res.success - } else { - let req = PutRequest::new().with_key(key).with_value(value); - kv_backend.put(req).await?; - true - }; - - Ok(success) -} diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 5c6d8c7b28bc..287072e03f51 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -27,9 +27,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result}; -use crate::key::{ - kv_backend_helper, TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX, -}; +use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; @@ -152,7 +150,9 @@ impl SchemaManager { let raw_key = schema.as_raw_key(); let raw_value = value.unwrap_or_default().try_as_raw_value()?; - if kv_backend_helper::put_conditionally(&self.kv_backend, raw_key, raw_value, if_not_exists) + if self + .kv_backend + .put_conditionally(raw_key, raw_value, if_not_exists) .await? { increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 30532a66fe93..c0459f68cef4 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -81,6 +81,32 @@ where }) } + /// Puts a value at a key. If `if_not_exists` is `true`, the operation + /// ensures the key does not exist before applying the PUT operation. + /// Otherwise, it simply applies the PUT operation without checking for + /// the key's existence. + async fn put_conditionally( + &self, + key: Vec, + value: Vec, + if_not_exists: bool, + ) -> Result { + let success = if if_not_exists { + let req = CompareAndPutRequest::new() + .with_key(key) + .with_expect(vec![]) + .with_value(value); + let res = self.compare_and_put(req).await?; + res.success + } else { + let req = PutRequest::new().with_key(key).with_value(value); + self.put(req).await?; + true + }; + + Ok(success) + } + /// Check if the key exists, not returning the value. /// If the value is large, this method is more efficient than `get`. async fn exists(&self, key: &[u8]) -> Result {