Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: system tables in new region server #2344

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use tokio::task::JoinError;

use crate::DeregisterTableRequest;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -179,20 +177,6 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display(
"Failed to deregister table, request: {:?}, source: {}",
request,
source
))]
DeregisterTable {
request: DeregisterTableRequest,
location: Location,
source: table::error::Error,
},

#[snafu(display("Illegal catalog manager state: {}", msg))]
IllegalManagerState { location: Location, msg: String },

#[snafu(display("Failed to scan system catalog table, source: {}", source))]
SystemCatalogTableScan {
location: Location,
Expand Down Expand Up @@ -269,7 +253,6 @@ impl ErrorExt for Error {
Error::InvalidKey { .. }
| Error::SchemaNotFound { .. }
| Error::TableNotFound { .. }
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
Expand Down Expand Up @@ -302,7 +285,6 @@ impl ErrorExt for Error {
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. }
| Error::TableSchemaMismatch { source, .. } => source.status_code(),

Error::MetaSrv { source, .. } => source.status_code(),
Expand Down
54 changes: 28 additions & 26 deletions src/catalog/src/lib.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does that local mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not real tables that will be persisted in the kv backend which the TableMatadataManager in the FrontendCatalogManager proxied to

Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,40 @@ pub mod tables;
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;

/// Starts a catalog manager.
async fn start(&self) -> Result<()>;

/// Registers a catalog to catalog manager, returns whether the catalog exist before.
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool>;
/// Register a local catalog.
///
/// # Returns
///
/// Whether the catalog is registered.
fn register_local_catalog(&self, name: &str) -> Result<bool>;

/// Register a schema with catalog name and schema name. Retuens whether the
/// schema registered.
/// Register a local schema.
///
/// # Returns
///
/// Whether the schema is registered.
///
/// # Errors
///
/// This method will/should fail if catalog not exist
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Deregisters a database within given catalog/schema to catalog manager
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;

/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
/// Registers a local table.
///
/// # Returns
///
/// Whether the table is registered.
///
/// # Errors
///
/// This method will/should fail if catalog or schema not exist
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool>;

/// Deregisters a table within given catalog/schema to catalog manager
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>;

/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>;

async fn catalog_names(&self) -> Result<Vec<String>>;

Expand Down Expand Up @@ -160,7 +164,7 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}

pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>(
manager: &'a M,
engine: TableEngineRef,
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
Expand All @@ -185,15 +189,13 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
table_name,
),
})?;
let _ = manager
.register_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})
.await?;
manager.register_local_table(RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: table_name.clone(),
table_id,
table: table.clone(),
})?;
info!("Created and registered system table: {table_name}");
table
};
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@
pub mod manager;
pub mod memory;

pub use manager::LocalCatalogManager;
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};
Loading