Skip to content

Commit

Permalink
fix: open region does not register catalog/schema (#2271)
Browse files Browse the repository at this point in the history
* fix: open region does not register catalog/schema

* fix: fix ci
  • Loading branch information
WenyXu authored Aug 28, 2023
1 parent c02ac36 commit 71fc3c4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
30 changes: 13 additions & 17 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl CatalogManager for MemoryCatalogManager {
}

impl MemoryCatalogManager {
/// Create a manager with some default setups
/// Creates a manager with some default setups
/// (e.g. default catalog/schema and information schema)
pub fn with_default_setup() -> Arc<Self> {
let manager = Arc::new(Self {
Expand All @@ -267,19 +267,7 @@ impl MemoryCatalogManager {
manager
}

/// Registers a catalog and return the catalog already exist
pub fn register_catalog_if_absent(&self, name: String) -> bool {
let mut catalogs = self.catalogs.write().unwrap();
let entry = catalogs.entry(name);
match entry {
Entry::Occupied(_) => true,
Entry::Vacant(v) => {
let _ = v.insert(HashMap::new());
false
}
}
}

/// Registers a catalog if it does not exist and returns false if the schema exists.
pub fn register_catalog_sync(self: &Arc<Self>, name: String) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();

Expand All @@ -294,6 +282,9 @@ impl MemoryCatalogManager {
}
}

/// Registers a schema if it does not exist.
/// It returns an error if the catalog does not exist,
/// and returns false if the schema exists.
pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
Expand All @@ -312,6 +303,7 @@ impl MemoryCatalogManager {
}
}

/// Registers a schema and returns an error if the catalog or schema does not exist.
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
Expand Down Expand Up @@ -524,10 +516,14 @@ mod tests {
}

#[test]
pub fn test_register_if_absent() {
pub fn test_register_catalog_sync() {
let list = MemoryCatalogManager::with_default_setup();
assert!(!list.register_catalog_if_absent("test_catalog".to_string(),));
assert!(list.register_catalog_if_absent("test_catalog".to_string()));
assert!(list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
assert!(!list
.register_catalog_sync("test_catalog".to_string())
.unwrap());
}

#[tokio::test]
Expand Down
54 changes: 44 additions & 10 deletions src/datanode/src/heartbeat/handler/open_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::sync::Arc;

use async_trait::async_trait;
use catalog::error::Error as CatalogError;
use catalog::error::{Error as CatalogError, Result as CatalogResult};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
Expand All @@ -30,6 +30,7 @@ use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::Table;

use crate::error::{self, Result};

Expand Down Expand Up @@ -157,6 +158,45 @@ impl OpenRegionHandler {
Ok(false)
}

async fn register_table(
&self,
request: &OpenTableRequest,
table: Arc<dyn Table>,
) -> CatalogResult<bool> {
if !self
.catalog_manager
.catalog_exist(&request.catalog_name)
.await?
{
self.catalog_manager
.clone()
.register_catalog(request.catalog_name.to_string())
.await?;
}

if !self
.catalog_manager
.schema_exist(&request.catalog_name, &request.schema_name)
.await?
{
self.catalog_manager
.register_schema(RegisterSchemaRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
})
.await?;
}

let request = RegisterTableRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
table_name: request.table_name.to_string(),
table_id: request.table_id,
table,
};
self.catalog_manager.register_table(request).await
}

async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result<bool> {
let OpenTableRequest {
catalog_name,
Expand Down Expand Up @@ -187,14 +227,8 @@ impl OpenRegionHandler {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?
{
let request = RegisterTableRequest {
catalog: request.catalog_name.clone(),
schema: request.schema_name.clone(),
table_name: request.table_name.clone(),
table_id: request.table_id,
table,
};
let result = self.catalog_manager.register_table(request).await;
let result = self.register_table(&request, table).await;

match result {
Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true),
e => e.with_context(|_| error::RegisterTableSnafu {
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ async fn test_open_region_handler() {

// Opens a non-exist table
let non_exist_table_ident = TableIdent {
catalog: "greptime".to_string(),
schema: "public".to_string(),
catalog: "foo".to_string(),
schema: "non-exist".to_string(),
table: "non-exist".to_string(),
table_id: 2024,
engine: "mito".to_string(),
Expand Down

0 comments on commit 71fc3c4

Please sign in to comment.