diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 53b5efc0f396..57880993acfe 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -158,7 +158,7 @@ impl InformationSchemaColumnsBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager - .schema_exist(&catalog_name, &schema_name) + .schema_exists(&catalog_name, &schema_name) .await? { continue; diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 9047aa3e594e..a626dbfdd31a 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -154,7 +154,7 @@ impl InformationSchemaTablesBuilder { for schema_name in catalog_manager.schema_names(&catalog_name).await? { if !catalog_manager - .schema_exist(&catalog_name, &schema_name) + .schema_exists(&catalog_name, &schema_name) .await? { continue; diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 2d152f16f226..8f929a7dcc6c 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -47,11 +47,11 @@ pub trait CatalogManager: Send + Sync { async fn table_names(&self, catalog: &str, schema: &str) -> Result>; - async fn catalog_exist(&self, catalog: &str) -> Result; + async fn catalog_exists(&self, catalog: &str) -> Result; - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result; + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result; - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result; + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result; /// Returns the table by catalog, schema and table name. async fn table( diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 29efc75ab9ff..49f209c6f4bc 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -37,7 +37,7 @@ pub struct MemoryCatalogManager { #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { self.schema_exist_sync(catalog, schema) } @@ -59,11 +59,11 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } - async fn catalog_exist(&self, catalog: &str) -> Result { + async fn catalog_exists(&self, catalog: &str) -> Result { self.catalog_exist_sync(catalog) } - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { let catalogs = self.catalogs.read().unwrap(); Ok(catalogs .get(catalog) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6425cb6e3454..e2ebbe6c9310 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -168,13 +168,12 @@ impl TableMetadataManager { pub async fn init(&self) -> Result<()> { let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME); - if !self.catalog_manager().exist(catalog_name).await? { - self.catalog_manager().create(catalog_name).await?; - } let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); - if !self.schema_manager().exist(schema_name).await? { - self.schema_manager().create(schema_name, None).await?; - } + + self.catalog_manager().create(catalog_name, true).await?; + self.schema_manager() + .create(schema_name, None, true) + .await?; Ok(()) } diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 6debb5af9356..1041e0cd81db 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -27,7 +27,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; #[derive(Debug, Clone, Copy, PartialEq)] @@ -103,23 +103,26 @@ impl CatalogManager { } /// Creates `CatalogNameKey`. - pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> { - let raw_key = catalog.as_raw_key(); + pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> { let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG); - let req = PutRequest::new() - .with_key(raw_key) - .with_value(CatalogNameValue.try_as_raw_value()?); - self.kv_backend.put(req).await?; - increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); + let raw_key = catalog.as_raw_key(); + let raw_value = CatalogNameValue.try_as_raw_value()?; + if self + .kv_backend + .put_conditionally(raw_key, raw_value, if_not_exists) + .await? + { + increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); + } Ok(()) } - pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result { + pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result { let raw_key = catalog.as_raw_key(); - Ok(self.kv_backend.get(&raw_key).await?.is_some()) + self.kv_backend.exists(&raw_key).await } pub async fn catalog_names(&self) -> BoxStream<'static, Result> { @@ -159,12 +162,12 @@ mod tests { let catalog_key = CatalogNameKey::new("my-catalog"); - manager.create(catalog_key).await.unwrap(); + manager.create(catalog_key, false).await.unwrap(); - assert!(manager.exist(catalog_key).await.unwrap()); + assert!(manager.exists(catalog_key).await.unwrap()); let wrong_catalog_key = CatalogNameKey::new("my-wrong"); - assert!(!manager.exist(wrong_catalog_key).await.unwrap()); + assert!(!manager.exists(wrong_catalog_key).await.unwrap()); } } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index f923e5818e30..287072e03f51 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -30,7 +30,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Res use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::{PutRequest, RangeRequest}; +use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; const OPT_KEY_TTL: &str = "ttl"; @@ -144,24 +144,27 @@ impl SchemaManager { &self, schema: SchemaNameKey<'_>, value: Option, + if_not_exists: bool, ) -> Result<()> { let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); let raw_key = schema.as_raw_key(); - let req = PutRequest::new() - .with_key(raw_key) - .with_value(value.unwrap_or_default().try_as_raw_value()?); - - self.kv_backend.put(req).await?; - increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); + let raw_value = value.unwrap_or_default().try_as_raw_value()?; + if self + .kv_backend + .put_conditionally(raw_key, raw_value, if_not_exists) + .await? + { + increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); + } Ok(()) } - pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result { + pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result { let raw_key = schema.as_raw_key(); - Ok(self.kv_backend.get(&raw_key).await?.is_some()) + self.kv_backend.exists(&raw_key).await } pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result> { @@ -222,12 +225,12 @@ mod tests { async fn test_key_exist() { let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default())); let schema_key = SchemaNameKey::new("my-catalog", "my-schema"); - manager.create(schema_key, None).await.unwrap(); + manager.create(schema_key, None, false).await.unwrap(); - assert!(manager.exist(schema_key).await.unwrap()); + assert!(manager.exists(schema_key).await.unwrap()); let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong"); - assert!(!manager.exist(wrong_schema_key).await.unwrap()); + assert!(!manager.exists(wrong_schema_key).await.unwrap()); } } diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 30532a66fe93..c0459f68cef4 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -81,6 +81,32 @@ where }) } + /// Puts a value at a key. If `if_not_exists` is `true`, the operation + /// ensures the key does not exist before applying the PUT operation. + /// Otherwise, it simply applies the PUT operation without checking for + /// the key's existence. + async fn put_conditionally( + &self, + key: Vec, + value: Vec, + if_not_exists: bool, + ) -> Result { + let success = if if_not_exists { + let req = CompareAndPutRequest::new() + .with_key(key) + .with_expect(vec![]) + .with_value(value); + let res = self.compare_and_put(req).await?; + res.success + } else { + let req = PutRequest::new().with_key(key).with_value(value); + self.put(req).await?; + true + }; + + Ok(success) + } + /// Check if the key exists, not returning the value. /// If the value is large, this method is more efficient than `get`. async fn exists(&self, key: &[u8]) -> Result { diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 76d7ed218446..d72b137a183c 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -190,27 +190,27 @@ impl CatalogManager for FrontendCatalogManager { Ok(tables) } - async fn catalog_exist(&self, catalog: &str) -> CatalogResult { + async fn catalog_exists(&self, catalog: &str) -> CatalogResult { self.table_metadata_manager .catalog_manager() - .exist(CatalogNameKey::new(catalog)) + .exists(CatalogNameKey::new(catalog)) .await .context(TableMetadataManagerSnafu) } - async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult { + async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult { if self.system_catalog.schema_exist(schema) { return Ok(true); } self.table_metadata_manager .schema_manager() - .exist(SchemaNameKey::new(catalog, schema)) + .exists(SchemaNameKey::new(catalog, schema)) .await .context(TableMetadataManagerSnafu) } - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { if self.system_catalog.table_exist(schema, table) { return Ok(true); } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index db88c1199359..cda343d2a099 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -538,7 +538,7 @@ impl SqlQueryHandler for Instance { async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager - .schema_exist(catalog, schema) + .schema_exists(catalog, schema) .await .context(error::CatalogSnafu) } diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/frontend/src/req_convert/delete/table_to_region.rs index fd5e983458f6..791125caf884 100644 --- a/src/frontend/src/req_convert/delete/table_to_region.rs +++ b/src/frontend/src/req_convert/delete/table_to_region.rs @@ -74,11 +74,11 @@ mod tests { let schema_manager = SchemaManager::new(backend.clone()); catalog_manager - .create(CatalogNameKey::default()) + .create(CatalogNameKey::default(), false) .await .unwrap(); schema_manager - .create(SchemaNameKey::default(), None) + .create(SchemaNameKey::default(), None, false) .await .unwrap(); diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs index 3160a58d0396..6592557d6dce 100644 --- a/src/frontend/src/req_convert/insert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -74,11 +74,11 @@ mod tests { let schema_manager = SchemaManager::new(backend.clone()); catalog_manager - .create(CatalogNameKey::default()) + .create(CatalogNameKey::default(), false) .await .unwrap(); schema_manager - .create(SchemaNameKey::default(), None) + .create(SchemaNameKey::default(), None, false) .await .unwrap(); diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index ac949e77f32a..1caaf87b1404 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -349,9 +349,9 @@ impl StatementExecutor { let exists = self .table_metadata_manager .schema_manager() - .exist(schema_key) + .exists(schema_key) .await - .context(error::TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)?; if exists { return if create_if_not_exists { @@ -363,9 +363,9 @@ impl StatementExecutor { self.table_metadata_manager .schema_manager() - .create(schema_key, None) + .create(schema_key, None, false) .await - .context(error::TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)?; Ok(Output::AffectedRows(1)) } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index f2a49e7452a5..9bea19c48fce 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,8 +25,6 @@ pub mod handler; pub mod keys; pub mod lease; pub mod lock; - -pub mod metadata_service; pub mod metasrv; mod metrics; #[cfg(feature = "mock")] diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs deleted file mode 100644 index 8816abb8bc65..000000000000 --- a/src/meta-srv/src/metadata_service.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use common_meta::key::catalog_name::CatalogNameKey; -use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::TableMetadataManagerRef; -use common_telemetry::info; -use snafu::{ensure, ResultExt}; - -use crate::error; -use crate::error::Result; - -/// This trait defines some methods of metadata -#[async_trait] -pub trait MetadataService: Send + Sync { - // An error occurs if the schema exists and "if_not_exist" == false. - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()>; - - async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; -} - -pub type MetadataServiceRef = Arc; - -#[derive(Clone)] -pub struct DefaultMetadataService { - table_metadata_manager: TableMetadataManagerRef, -} - -impl DefaultMetadataService { - pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { - Self { - table_metadata_manager, - } - } -} - -#[async_trait] -impl MetadataService for DefaultMetadataService { - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()> { - self.table_metadata_manager - .catalog_manager() - .create(CatalogNameKey::new(catalog_name)) - .await - .context(error::TableMetadataManagerSnafu)?; - - info!("Successfully created a catalog: {}", catalog_name); - - let schema = SchemaNameKey::new(catalog_name, schema_name); - - let exist = self - .table_metadata_manager - .schema_manager() - .exist(schema) - .await - .context(error::TableMetadataManagerSnafu)?; - - ensure!( - !exist || if_not_exist, - error::SchemaAlreadyExistsSnafu { schema_name } - ); - - if !exist { - self.table_metadata_manager - .schema_manager() - .create(schema, None) - .await - .context(error::TableMetadataManagerSnafu)?; - - info!("Successfully created a schema: {}", schema_name); - } - - Ok(()) - } - - async fn delete_schema(&self, _catalog_name: &str, _schema_name: &str) -> Result<()> { - unimplemented!() - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_meta::key::catalog_name::CatalogNameKey; - use common_meta::key::schema_name::SchemaNameKey; - use common_meta::key::{TableMetaKey, TableMetadataManager}; - - use super::{DefaultMetadataService, MetadataService}; - use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; - use crate::service::store::memory::MemStore; - - #[tokio::test] - async fn test_create_schema() { - let kv_store = Arc::new(MemStore::default()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); - let service = DefaultMetadataService::new(table_metadata_manager); - - service - .create_schema("catalog", "public", false) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - - let result = service.create_schema("catalog", "public", false).await; - assert!(result.is_err()); - - service - .create_schema("catalog", "public", true) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - } - - async fn verify_result(kv_store: KvStoreRef) { - let key = CatalogNameKey::new("catalog").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - - let key = SchemaNameKey::new("catalog", "public").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - } -}