Skip to content

Commit

Permalink
fix: fix procedure loaders not found issue
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 27, 2023
1 parent 0badb37 commit e0b6fab
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ tonic.workspace = true
chrono.workspace = true
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
common-procedure = { workspace = true, features = ["testing"] }
98 changes: 92 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;

pub type DdlManagerRef = Arc<DdlManager>;

/// The [DdlManager] provides the ability to execute Ddl.
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
Expand All @@ -55,26 +55,31 @@ pub struct DdlManager {
}

impl DdlManager {
pub fn new(
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
procedure_manager: ProcedureManagerRef,
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
) -> Result<Self> {
let manager = Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
}
};
manager.register_loaders()?;
Ok(manager)
}

/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}

/// Returns the [DdlContext]
pub fn create_context(&self) -> DdlContext {
DdlContext {
datanode_manager: self.datanode_manager.clone(),
Expand All @@ -83,7 +88,7 @@ impl DdlManager {
}
}

pub fn try_start(&self) -> Result<()> {
fn register_loaders(&self) -> Result<()> {
let context = self.create_context();

self.procedure_manager
Expand Down Expand Up @@ -142,6 +147,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes an alter table task.
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
Expand All @@ -159,6 +165,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
Expand All @@ -176,6 +183,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
Expand All @@ -199,6 +207,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
Expand Down Expand Up @@ -416,3 +425,80 @@ impl DdlTaskExecutor for DdlManager {
.await
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use api::v1::meta::Partition;
use common_procedure::local::LocalManager;
use table::metadata::{RawTableInfo, TableId};

use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{DatanodeManager, DatanodeRef};
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::RegionRoute;
use crate::state_store::KvStateStore;

/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;

#[async_trait::async_trait]
impl DatanodeManager for DummyDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
unimplemented!()
}
}

/// A dummy implemented [TableMetadataAllocator].
pub struct DummyTableMetadataAllocator;

#[async_trait::async_trait]
impl TableMetadataAllocator for DummyTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_table_info: &mut RawTableInfo,
_partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)> {
unimplemented!()
}
}

#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let state_store = Arc::new(KvStateStore::new(kv_backend));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));

let _ = DdlManager::try_new(
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
);

let expected_loaders = vec![
CreateTableProcedure::TYPE_NAME,
AlterTableProcedure::TYPE_NAME,
DropTableProcedure::TYPE_NAME,
TruncateTableProcedure::TYPE_NAME,
];

for loader in expected_loaders {
assert!(procedure_manager.contains_loader(loader));
}
}
}
4 changes: 4 additions & 0 deletions src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true


[features]
testing=[]

[dependencies]
async-stream.workspace = true
async-trait.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ impl LocalManager {

Ok(())
}

#[cfg(any(test, feature = "testing"))]
/// Returns true if contains a specified loader.
pub fn contains_loader(&self, name: &str) -> bool {
let loaders = self.manager_ctx.loaders.lock().unwrap();
loaders.contains_key(name)
}
}

#[async_trait]
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use store_api::storage::RegionNumber;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
location: Location,
Expand Down Expand Up @@ -319,7 +325,9 @@ impl ErrorExt for Error {

Error::ParseSql { source, .. } => source.status_code(),

Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}

Error::Table { source, .. }
| Error::CopyTable { source, .. }
Expand Down
17 changes: 10 additions & 7 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,16 @@ impl Instance {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));
let ddl_executor = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
)
.context(error::InitDdlManagerSnafu)?,
);

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub enum Error {
region_id: RegionId,
},

#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
Expand Down Expand Up @@ -685,7 +691,9 @@ impl ErrorExt for Error {
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}

Error::Other { source, .. } => source.status_code(),
}
Expand Down
23 changes: 13 additions & 10 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;

use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::Result;
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
Expand Down Expand Up @@ -196,8 +197,7 @@ impl MetaSrvBuilder {
&table_metadata_manager,
(&selector, &selector_ctx),
&table_id_sequence,
);
let _ = ddl_manager.try_start();
)?;
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler_group = match handler_group {
Expand Down Expand Up @@ -330,7 +330,7 @@ fn build_ddl_manager(
table_metadata_manager: &TableMetadataManagerRef,
(selector, selector_ctx): (&SelectorRef, &SelectorContext),
table_id_sequence: &SequenceRef,
) -> DdlManagerRef {
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
Expand All @@ -355,12 +355,15 @@ fn build_ddl_manager(
table_id_sequence.clone(),
));

Arc::new(DdlManager::new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
Ok(Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
)
.context(error::InitDdlManagerSnafu)?,
))
}

Expand Down
7 changes: 5 additions & 2 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,20 @@ impl GreptimeDbStandaloneBuilder {
.init()
.await
.unwrap();
procedure_manager.start().await.unwrap();

let instance = Instance::try_new_standalone(
kv_backend,
procedure_manager,
procedure_manager.clone(),
catalog_manager,
plugins,
datanode.region_server(),
)
.await
.unwrap();

// Ensures all loaders are registered.
procedure_manager.start().await.unwrap();

test_util::prepare_another_catalog_and_schema(&instance).await;

instance.start().await.unwrap();
Expand Down

0 comments on commit e0b6fab

Please sign in to comment.