Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix procedure loaders not found issue #2824

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ tonic.workspace = true

[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
98 changes: 92 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;

pub type DdlManagerRef = Arc<DdlManager>;

/// The [DdlManager] provides the ability to execute Ddl.
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
Expand All @@ -55,26 +55,31 @@ pub struct DdlManager {
}

impl DdlManager {
pub fn new(
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
procedure_manager: ProcedureManagerRef,
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
) -> Result<Self> {
let manager = Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
}
};
manager.register_loaders()?;
Ok(manager)
}

/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}

/// Returns the [DdlContext]
pub fn create_context(&self) -> DdlContext {
DdlContext {
datanode_manager: self.datanode_manager.clone(),
Expand All @@ -83,7 +88,7 @@ impl DdlManager {
}
}

pub fn try_start(&self) -> Result<()> {
fn register_loaders(&self) -> Result<()> {
let context = self.create_context();

self.procedure_manager
Expand Down Expand Up @@ -142,6 +147,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes an alter table task.
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
Expand All @@ -159,6 +165,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
Expand All @@ -176,6 +183,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
Expand All @@ -199,6 +207,7 @@ impl DdlManager {
}

#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
Expand Down Expand Up @@ -416,3 +425,80 @@ impl DdlTaskExecutor for DdlManager {
.await
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use api::v1::meta::Partition;
use common_procedure::local::LocalManager;
use table::metadata::{RawTableInfo, TableId};

use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{DatanodeManager, DatanodeRef};
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::RegionRoute;
use crate::state_store::KvStateStore;

/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;

#[async_trait::async_trait]
impl DatanodeManager for DummyDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
unimplemented!()
}
}

/// A dummy implemented [TableMetadataAllocator].
pub struct DummyTableMetadataAllocator;

#[async_trait::async_trait]
impl TableMetadataAllocator for DummyTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_table_info: &mut RawTableInfo,
_partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)> {
unimplemented!()
}
}

#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let state_store = Arc::new(KvStateStore::new(kv_backend));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));

let _ = DdlManager::try_new(
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
);

let expected_loaders = vec![
CreateTableProcedure::TYPE_NAME,
AlterTableProcedure::TYPE_NAME,
DropTableProcedure::TYPE_NAME,
TruncateTableProcedure::TYPE_NAME,
];

for loader in expected_loaders {
assert!(procedure_manager.contains_loader(loader));
}
}
}
4 changes: 4 additions & 0 deletions src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true


[features]
testing=[]

[dependencies]
async-stream.workspace = true
async-trait.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ impl LocalManager {

Ok(())
}

#[cfg(any(test, feature = "testing"))]
/// Returns true if contains a specified loader.
pub fn contains_loader(&self, name: &str) -> bool {
let loaders = self.manager_ctx.loaders.lock().unwrap();
loaders.contains_key(name)
}
}

#[async_trait]
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use store_api::storage::RegionNumber;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
location: Location,
Expand Down Expand Up @@ -319,7 +325,9 @@ impl ErrorExt for Error {

Error::ParseSql { source, .. } => source.status_code(),

Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}

Error::Table { source, .. }
| Error::CopyTable { source, .. }
Expand Down
17 changes: 10 additions & 7 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,16 @@ impl Instance {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));
let ddl_executor = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
)
.context(error::InitDdlManagerSnafu)?,
);

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub enum Error {
region_id: RegionId,
},

#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
Expand Down Expand Up @@ -685,7 +691,9 @@ impl ErrorExt for Error {
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}

Error::Other { source, .. } => source.status_code(),
}
Expand Down
23 changes: 13 additions & 10 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;

use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::Result;
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
Expand Down Expand Up @@ -196,8 +197,7 @@ impl MetaSrvBuilder {
&table_metadata_manager,
(&selector, &selector_ctx),
&table_id_sequence,
);
let _ = ddl_manager.try_start();
)?;
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler_group = match handler_group {
Expand Down Expand Up @@ -330,7 +330,7 @@ fn build_ddl_manager(
table_metadata_manager: &TableMetadataManagerRef,
(selector, selector_ctx): (&SelectorRef, &SelectorContext),
table_id_sequence: &SequenceRef,
) -> DdlManagerRef {
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
Expand All @@ -355,12 +355,15 @@ fn build_ddl_manager(
table_id_sequence.clone(),
));

Arc::new(DdlManager::new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
Ok(Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
)
.context(error::InitDdlManagerSnafu)?,
))
}

Expand Down
7 changes: 5 additions & 2 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,20 @@ impl GreptimeDbStandaloneBuilder {
.init()
.await
.unwrap();
procedure_manager.start().await.unwrap();

let instance = Instance::try_new_standalone(
kv_backend,
procedure_manager,
procedure_manager.clone(),
catalog_manager,
plugins,
datanode.region_server(),
)
.await
.unwrap();

// Ensures all loaders are registered.
procedure_manager.start().await.unwrap();

test_util::prepare_another_catalog_and_schema(&instance).await;

instance.start().await.unwrap();
Expand Down
Loading