Skip to content

Commit

Permalink
refactor: inverse the dependency between system tables and catalog ma…
Browse files Browse the repository at this point in the history
…nager
  • Loading branch information
MichaelScofield committed Sep 7, 2023
1 parent a3d1bb7 commit 0e82153
Show file tree
Hide file tree
Showing 26 changed files with 231 additions and 1,039 deletions.
18 changes: 0 additions & 18 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use tokio::task::JoinError;

use crate::DeregisterTableRequest;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -179,20 +177,6 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display(
"Failed to deregister table, request: {:?}, source: {}",
request,
source
))]
DeregisterTable {
request: DeregisterTableRequest,
location: Location,
source: table::error::Error,
},

#[snafu(display("Illegal catalog manager state: {}", msg))]
IllegalManagerState { location: Location, msg: String },

#[snafu(display("Failed to scan system catalog table, source: {}", source))]
SystemCatalogTableScan {
location: Location,
Expand Down Expand Up @@ -269,7 +253,6 @@ impl ErrorExt for Error {
Error::InvalidKey { .. }
| Error::SchemaNotFound { .. }
| Error::TableNotFound { .. }
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
Expand Down Expand Up @@ -302,7 +285,6 @@ impl ErrorExt for Error {
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. }
| Error::TableSchemaMismatch { source, .. } => source.status_code(),

Error::MetaSrv { source, .. } => source.status_code(),
Expand Down
54 changes: 28 additions & 26 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,40 @@ pub mod tables;
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;

/// Starts a catalog manager.
async fn start(&self) -> Result<()>;

/// Registers a catalog to catalog manager, returns whether the catalog exist before.
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool>;
/// Register a local catalog.
///
/// # Returns
///
/// Whether the catalog is registered.
fn register_local_catalog(&self, name: &str) -> Result<bool>;

/// Register a schema with catalog name and schema name. Retuens whether the
/// schema registered.
/// Register a local schema.
///
/// # Returns
///
/// Whether the schema is registered.
///
/// # Errors
///
/// This method will/should fail if catalog not exist
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Deregisters a database within given catalog/schema to catalog manager
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;

/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
/// Registers a local table.
///
/// # Returns
///
/// Whether the table is registered.
///
/// # Errors
///
/// This method will/should fail if catalog or schema not exist
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool>;

/// Deregisters a table within given catalog/schema to catalog manager
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>;

/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>;

async fn catalog_names(&self) -> Result<Vec<String>>;

Expand Down Expand Up @@ -160,7 +164,7 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}

pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>(
manager: &'a M,
engine: TableEngineRef,
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
Expand All @@ -185,15 +189,13 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table_name,
),
})?;
let _ = manager
.register_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})
.await?;
manager.register_local_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})?;
info!("Created and registered system table: {table_name}");
table
};
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@
pub mod manager;
pub mod memory;

pub use manager::LocalCatalogManager;
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};
Loading

0 comments on commit 0e82153

Please sign in to comment.