Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: open region does not register catalog/schema #2271

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl CatalogManager for MemoryCatalogManager {
}

impl MemoryCatalogManager {
/// Create a manager with some default setups
/// Creates a manager with some default setups
/// (e.g. default catalog/schema and information schema)
pub fn with_default_setup() -> Arc<Self> {
let manager = Arc::new(Self {
Expand All @@ -267,19 +267,7 @@ impl MemoryCatalogManager {
manager
}

/// Registers a catalog and return the catalog already exist
pub fn register_catalog_if_absent(&self, name: String) -> bool {
let mut catalogs = self.catalogs.write().unwrap();
let entry = catalogs.entry(name);
match entry {
Entry::Occupied(_) => true,
Entry::Vacant(v) => {
let _ = v.insert(HashMap::new());
false
}
}
}

/// Registers a catalog if it does not exist and returns false if the schema exists.
pub fn register_catalog_sync(self: &Arc<Self>, name: String) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();

Expand All @@ -294,6 +282,9 @@ impl MemoryCatalogManager {
}
}

/// Registers a schema if it does not exist.
/// It returns an error if the catalog does not exist,
/// and returns false if the schema exists.
pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
Expand All @@ -312,6 +303,7 @@ impl MemoryCatalogManager {
}
}

/// Registers a schema and returns an error if the catalog or schema does not exist.
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
Expand Down Expand Up @@ -524,10 +516,14 @@ mod tests {
}

#[test]
pub fn test_register_if_absent() {
pub fn test_register_catalog_sync() {
let list = MemoryCatalogManager::with_default_setup();
assert!(!list.register_catalog_if_absent("test_catalog".to_string(),));
assert!(list.register_catalog_if_absent("test_catalog".to_string()));
assert!(list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
assert!(!list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
}

#[tokio::test]
Expand Down
54 changes: 44 additions & 10 deletions src/datanode/src/heartbeat/handler/open_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::sync::Arc;

use async_trait::async_trait;
use catalog::error::Error as CatalogError;
use catalog::error::{Error as CatalogError, Result as CatalogResult};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
Expand All @@ -30,6 +30,7 @@ use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::Table;

use crate::error::{self, Result};

Expand Down Expand Up @@ -157,6 +158,45 @@ impl OpenRegionHandler {
Ok(false)
}

async fn register_table(
&self,
request: &OpenTableRequest,
table: Arc<dyn Table>,
) -> CatalogResult<bool> {
if !self
.catalog_manager
.catalog_exist(&request.catalog_name)
.await?
{
self.catalog_manager
.clone()
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
.register_catalog(request.catalog_name.to_string())
.await?;
}

if !self
.catalog_manager
.schema_exist(&request.catalog_name, &request.schema_name)
.await?
{
self.catalog_manager
.register_schema(RegisterSchemaRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
})
.await?;
}

let request = RegisterTableRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
table_name: request.table_name.to_string(),
table_id: request.table_id,
table,
};
self.catalog_manager.register_table(request).await
}

async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result<bool> {
let OpenTableRequest {
catalog_name,
Expand Down Expand Up @@ -187,14 +227,8 @@ impl OpenRegionHandler {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?
{
let request = RegisterTableRequest {
catalog: request.catalog_name.clone(),
schema: request.schema_name.clone(),
table_name: request.table_name.clone(),
table_id: request.table_id,
table,
};
let result = self.catalog_manager.register_table(request).await;
let result = self.register_table(&request, table).await;

match result {
Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true),
e => e.with_context(|_| error::RegisterTableSnafu {
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ async fn test_open_region_handler() {

// Opens a non-exist table
let non_exist_table_ident = TableIdent {
catalog: "greptime".to_string(),
schema: "public".to_string(),
catalog: "foo".to_string(),
schema: "non-exist".to_string(),
table: "non-exist".to_string(),
table_id: 2024,
engine: "mito".to_string(),
Expand Down