Skip to content

Commit

Permalink
feat: atomic metadata (#2366)
Browse files Browse the repository at this point in the history
* feat: atomic creating metadata

* chore: exist exists

* chore: license header

* chore: weny never say that

* feat: add put_conditionally to kv_backend
  • Loading branch information
fengjiachun authored Sep 13, 2023
1 parent 6f4779b commit f76aa27
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 208 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl InformationSchemaColumnsBuilder {

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl InformationSchemaTablesBuilder {

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ pub trait CatalogManager: Send + Sync {

async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>>;

async fn catalog_exist(&self, catalog: &str) -> Result<bool>;
async fn catalog_exists(&self, catalog: &str) -> Result<bool>;

async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool>;
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool>;

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;

/// Returns the table by catalog, schema and table name.
async fn table(
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct MemoryCatalogManager {

#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

Expand All @@ -59,11 +59,11 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}

async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
Expand Down
11 changes: 5 additions & 6 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ impl TableMetadataManager {

pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
if !self.catalog_manager().exist(catalog_name).await? {
self.catalog_manager().create(catalog_name).await?;
}
let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
if !self.schema_manager().exist(schema_name).await? {
self.schema_manager().create(schema_name, None).await?;
}

self.catalog_manager().create(catalog_name, true).await?;
self.schema_manager()
.create(schema_name, None, true)
.await?;

Ok(())
}
Expand Down
29 changes: 16 additions & 13 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -103,23 +103,26 @@ impl CatalogManager {
}

/// Creates `CatalogNameKey`.
pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> {
let raw_key = catalog.as_raw_key();
pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG);

let req = PutRequest::new()
.with_key(raw_key)
.with_value(CatalogNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);
let raw_key = catalog.as_raw_key();
let raw_value = CatalogNameValue.try_as_raw_value()?;
if self
.kv_backend
.put_conditionally(raw_key, raw_value, if_not_exists)
.await?
{
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);
}

Ok(())
}

pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
let raw_key = catalog.as_raw_key();

Ok(self.kv_backend.get(&raw_key).await?.is_some())
self.kv_backend.exists(&raw_key).await
}

pub async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
Expand Down Expand Up @@ -159,12 +162,12 @@ mod tests {

let catalog_key = CatalogNameKey::new("my-catalog");

manager.create(catalog_key).await.unwrap();
manager.create(catalog_key, false).await.unwrap();

assert!(manager.exist(catalog_key).await.unwrap());
assert!(manager.exists(catalog_key).await.unwrap());

let wrong_catalog_key = CatalogNameKey::new("my-wrong");

assert!(!manager.exist(wrong_catalog_key).await.unwrap());
assert!(!manager.exists(wrong_catalog_key).await.unwrap());
}
}
27 changes: 15 additions & 12 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Res
use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

const OPT_KEY_TTL: &str = "ttl";
Expand Down Expand Up @@ -144,24 +144,27 @@ impl SchemaManager {
&self,
schema: SchemaNameKey<'_>,
value: Option<SchemaNameValue>,
if_not_exists: bool,
) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA);

let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(value.unwrap_or_default().try_as_raw_value()?);

self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);
let raw_value = value.unwrap_or_default().try_as_raw_value()?;
if self
.kv_backend
.put_conditionally(raw_key, raw_value, if_not_exists)
.await?
{
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);
}

Ok(())
}

pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
let raw_key = schema.as_raw_key();

Ok(self.kv_backend.get(&raw_key).await?.is_some())
self.kv_backend.exists(&raw_key).await
}

pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
Expand Down Expand Up @@ -222,12 +225,12 @@ mod tests {
async fn test_key_exist() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key, None).await.unwrap();
manager.create(schema_key, None, false).await.unwrap();

assert!(manager.exist(schema_key).await.unwrap());
assert!(manager.exists(schema_key).await.unwrap());

let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");

assert!(!manager.exist(wrong_schema_key).await.unwrap());
assert!(!manager.exists(wrong_schema_key).await.unwrap());
}
}
26 changes: 26 additions & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ where
})
}

/// Puts a value at a key. If `if_not_exists` is `true`, the operation
/// ensures the key does not exist before applying the PUT operation.
/// Otherwise, it simply applies the PUT operation without checking for
/// the key's existence.
async fn put_conditionally(
&self,
key: Vec<u8>,
value: Vec<u8>,
if_not_exists: bool,
) -> Result<bool, Self::Error> {
let success = if if_not_exists {
let req = CompareAndPutRequest::new()
.with_key(key)
.with_expect(vec![])
.with_value(value);
let res = self.compare_and_put(req).await?;
res.success
} else {
let req = PutRequest::new().with_key(key).with_value(value);
self.put(req).await?;
true
};

Ok(success)
}

/// Check if the key exists, not returning the value.
/// If the value is large, this method is more efficient than `get`.
async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,27 @@ impl CatalogManager for FrontendCatalogManager {
Ok(tables)
}

async fn catalog_exist(&self, catalog: &str) -> CatalogResult<bool> {
async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
self.table_metadata_manager
.catalog_manager()
.exist(CatalogNameKey::new(catalog))
.exists(CatalogNameKey::new(catalog))
.await
.context(TableMetadataManagerSnafu)
}

async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
if self.system_catalog.schema_exist(schema) {
return Ok(true);
}

self.table_metadata_manager
.schema_manager()
.exist(SchemaNameKey::new(catalog, schema))
.exists(SchemaNameKey::new(catalog, schema))
.await
.context(TableMetadataManagerSnafu)
}

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
if self.system_catalog.table_exist(schema, table) {
return Ok(true);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl SqlQueryHandler for Instance {

async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema_exist(catalog, schema)
.schema_exists(catalog, schema)
.await
.context(error::CatalogSnafu)
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/req_convert/delete/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ mod tests {
let schema_manager = SchemaManager::new(backend.clone());

catalog_manager
.create(CatalogNameKey::default())
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ mod tests {
let schema_manager = SchemaManager::new(backend.clone());

catalog_manager
.create(CatalogNameKey::default())
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ impl StatementExecutor {
let exists = self
.table_metadata_manager
.schema_manager()
.exist(schema_key)
.exists(schema_key)
.await
.context(error::TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?;

if exists {
return if create_if_not_exists {
Expand All @@ -363,9 +363,9 @@ impl StatementExecutor {

self.table_metadata_manager
.schema_manager()
.create(schema_key, None)
.create(schema_key, None, false)
.await
.context(error::TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?;

Ok(Output::AffectedRows(1))
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;

pub mod metadata_service;
pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
Expand Down
Loading

0 comments on commit f76aa27

Please sign in to comment.