Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: atomic metadata #2366

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl InformationSchemaColumnsBuilder {

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl InformationSchemaTablesBuilder {

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.schema_exists(&catalog_name, &schema_name)
.await?
{
continue;
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ pub trait CatalogManager: Send + Sync {

async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>>;

async fn catalog_exist(&self, catalog: &str) -> Result<bool>;
async fn catalog_exists(&self, catalog: &str) -> Result<bool>;

async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool>;
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool>;

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;

/// Returns the table by catalog, schema and table name.
async fn table(
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct MemoryCatalogManager {

#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

Expand All @@ -59,11 +59,11 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}

async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
Expand Down
11 changes: 5 additions & 6 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ impl TableMetadataManager {

pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
if !self.catalog_manager().exist(catalog_name).await? {
self.catalog_manager().create(catalog_name).await?;
}
let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
if !self.schema_manager().exist(schema_name).await? {
self.schema_manager().create(schema_name, None).await?;
}

self.catalog_manager().create(catalog_name, true).await?;
self.schema_manager()
.create(schema_name, None, true)
.await?;

Ok(())
}
Expand Down
29 changes: 16 additions & 13 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -103,23 +103,26 @@ impl CatalogManager {
}

/// Creates `CatalogNameKey`.
pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> {
let raw_key = catalog.as_raw_key();
pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG);

let req = PutRequest::new()
.with_key(raw_key)
.with_value(CatalogNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);
let raw_key = catalog.as_raw_key();
let raw_value = CatalogNameValue.try_as_raw_value()?;
if self
.kv_backend
.put_conditionally(raw_key, raw_value, if_not_exists)
.await?
{
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);
}

Ok(())
}

pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
let raw_key = catalog.as_raw_key();

Ok(self.kv_backend.get(&raw_key).await?.is_some())
self.kv_backend.exists(&raw_key).await
}

pub async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
Expand Down Expand Up @@ -159,12 +162,12 @@ mod tests {

let catalog_key = CatalogNameKey::new("my-catalog");

manager.create(catalog_key).await.unwrap();
manager.create(catalog_key, false).await.unwrap();

assert!(manager.exist(catalog_key).await.unwrap());
assert!(manager.exists(catalog_key).await.unwrap());

let wrong_catalog_key = CatalogNameKey::new("my-wrong");

assert!(!manager.exist(wrong_catalog_key).await.unwrap());
assert!(!manager.exists(wrong_catalog_key).await.unwrap());
}
}
27 changes: 15 additions & 12 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Res
use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

const OPT_KEY_TTL: &str = "ttl";
Expand Down Expand Up @@ -144,24 +144,27 @@ impl SchemaManager {
&self,
schema: SchemaNameKey<'_>,
value: Option<SchemaNameValue>,
if_not_exists: bool,
) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA);

let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(value.unwrap_or_default().try_as_raw_value()?);

self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);
let raw_value = value.unwrap_or_default().try_as_raw_value()?;
if self
.kv_backend
.put_conditionally(raw_key, raw_value, if_not_exists)
.await?
{
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);
}

Ok(())
}

pub async fn exist(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
pub async fn exists(&self, schema: SchemaNameKey<'_>) -> Result<bool> {
let raw_key = schema.as_raw_key();

Ok(self.kv_backend.get(&raw_key).await?.is_some())
self.kv_backend.exists(&raw_key).await
}

pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
Expand Down Expand Up @@ -222,12 +225,12 @@ mod tests {
async fn test_key_exist() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key, None).await.unwrap();
manager.create(schema_key, None, false).await.unwrap();

assert!(manager.exist(schema_key).await.unwrap());
assert!(manager.exists(schema_key).await.unwrap());

let wrong_schema_key = SchemaNameKey::new("my-catalog", "my-wrong");

assert!(!manager.exist(wrong_schema_key).await.unwrap());
assert!(!manager.exists(wrong_schema_key).await.unwrap());
}
}
26 changes: 26 additions & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ where
})
}

/// Puts a value at a key. If `if_not_exists` is `true`, the operation
/// ensures the key does not exist before applying the PUT operation.
/// Otherwise, it simply applies the PUT operation without checking for
/// the key's existence.
async fn put_conditionally(
&self,
key: Vec<u8>,
value: Vec<u8>,
if_not_exists: bool,
) -> Result<bool, Self::Error> {
let success = if if_not_exists {
let req = CompareAndPutRequest::new()
.with_key(key)
.with_expect(vec![])
.with_value(value);
let res = self.compare_and_put(req).await?;
res.success
} else {
let req = PutRequest::new().with_key(key).with_value(value);
self.put(req).await?;
true
};

Ok(success)
}

/// Check if the key exists, not returning the value.
/// If the value is large, this method is more efficient than `get`.
async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,27 @@ impl CatalogManager for FrontendCatalogManager {
Ok(tables)
}

async fn catalog_exist(&self, catalog: &str) -> CatalogResult<bool> {
async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
self.table_metadata_manager
.catalog_manager()
.exist(CatalogNameKey::new(catalog))
.exists(CatalogNameKey::new(catalog))
.await
.context(TableMetadataManagerSnafu)
}

async fn schema_exist(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
if self.system_catalog.schema_exist(schema) {
return Ok(true);
}

self.table_metadata_manager
.schema_manager()
.exist(SchemaNameKey::new(catalog, schema))
.exists(SchemaNameKey::new(catalog, schema))
.await
.context(TableMetadataManagerSnafu)
}

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
if self.system_catalog.table_exist(schema, table) {
return Ok(true);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl SqlQueryHandler for Instance {

async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema_exist(catalog, schema)
.schema_exists(catalog, schema)
.await
.context(error::CatalogSnafu)
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/req_convert/delete/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ mod tests {
let schema_manager = SchemaManager::new(backend.clone());

catalog_manager
.create(CatalogNameKey::default())
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ mod tests {
let schema_manager = SchemaManager::new(backend.clone());

catalog_manager
.create(CatalogNameKey::default())
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None)
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ impl StatementExecutor {
let exists = self
.table_metadata_manager
.schema_manager()
.exist(schema_key)
.exists(schema_key)
.await
.context(error::TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?;

if exists {
return if create_if_not_exists {
Expand All @@ -363,9 +363,9 @@ impl StatementExecutor {

self.table_metadata_manager
.schema_manager()
.create(schema_key, None)
.create(schema_key, None, false)
.await
.context(error::TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?;

Ok(Output::AffectedRows(1))
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;

pub mod metadata_service;
pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
Expand Down
Loading