Skip to content

Commit

Permalink
refactor: refactor DatanodeManager to NodeManager (#3811)
Browse files Browse the repository at this point in the history
* chore: bump greptime-proto to 2c14c6e

* refactor: refactor `DatanodeManager` to `NodeManager`
  • Loading branch information
WenyXu authored Apr 26, 2024
1 parent 934c7e3 commit eb3d2ca
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "783682fabc38c57b5b9d46bdcfeebe2496e85bbb" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c14c6e22dfe957f40bb88dd01fb8530656de89b" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
11 changes: 8 additions & 3 deletions src/client/src/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::datanode_manager::{Datanode, DatanodeManager};
use common_meta::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager};
use common_meta::peer::Peer;
use moka::future::{Cache, CacheBuilder};

Expand All @@ -44,12 +44,17 @@ impl Debug for DatanodeClients {
}

#[async_trait::async_trait]
impl DatanodeManager for DatanodeClients {
async fn datanode(&self, datanode: &Peer) -> Arc<dyn Datanode> {
impl NodeManager for DatanodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await;

Arc::new(RegionRequester::new(client))
}

async fn flownode(&self, _node: &Peer) -> FlownodeRef {
// TODO(weny): Support it.
unimplemented!()
}
}

impl DatanodeClients {
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManager;
Expand Down Expand Up @@ -468,7 +468,7 @@ impl StartCommand {
pub async fn create_ddl_task_executor(
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
Expand Down
1 change: 1 addition & 0 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
Kind::RenameTable(RenameTable { new_table_name }) => {
AlterKind::RenameTable { new_table_name }
}
Kind::ChangeColumnTypes(_) => unimplemented!(),
};

let request = AlterTableRequest {
Expand Down
20 changes: 17 additions & 3 deletions src/common/meta/src/datanode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest};
use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -34,11 +35,24 @@ pub trait Datanode: Send + Sync {

pub type DatanodeRef = Arc<dyn Datanode>;

/// The trait for handling requests to flownode
#[async_trait::async_trait]
pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;

async fn handle_insert(&self, request: InsertRequest) -> Result<FlowResponse>;
}

pub type FlownodeRef = Arc<dyn Flownode>;

/// Datanode manager
#[async_trait::async_trait]
pub trait DatanodeManager: Send + Sync {
pub trait NodeManager: Send + Sync {
/// Retrieves a target `datanode`.
async fn datanode(&self, datanode: &Peer) -> DatanodeRef;
async fn datanode(&self, node: &Peer) -> DatanodeRef;

/// Retrieves a target `flownode`.
async fn flownode(&self, node: &Peer) -> FlownodeRef;
}

pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;
pub type NodeManagerRef = Arc<dyn NodeManager>;
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use store_api::storage::{RegionNumber, TableId};

use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::datanode_manager::NodeManagerRef;
use crate::error::Result;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct TableMetadata {

#[derive(Clone)]
pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,
pub datanode_manager: NodeManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/alter_table/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn create_proto_alter_kind(
})))
}
Kind::RenameTable(_) => Ok(None),
Kind::ChangeColumnTypes(_) => unimplemented!(),
}
}

Expand Down
14 changes: 9 additions & 5 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::TableId;

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::datanode_manager::NodeManagerRef;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
Expand Down Expand Up @@ -64,7 +64,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
/// The [DdlManager] provides the ability to execute Ddl.
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
Expand All @@ -75,7 +75,7 @@ pub struct DdlManager {
impl DdlManager {
pub fn try_new(
procedure_manager: ProcedureManagerRef,
datanode_clients: DatanodeManagerRef,
datanode_clients: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
Expand Down Expand Up @@ -716,7 +716,7 @@ mod tests {

use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{DatanodeManager, DatanodeRef};
use crate::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
Expand All @@ -734,10 +734,14 @@ mod tests {
pub struct DummyDatanodeManager;

#[async_trait::async_trait]
impl DatanodeManager for DummyDatanodeManager {
impl NodeManager for DummyDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
unimplemented!()
}

async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
}
}

#[test]
Expand Down
12 changes: 8 additions & 4 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
use crate::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef};
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::DdlContext;
use crate::error::Result;
Expand Down Expand Up @@ -74,24 +74,28 @@ impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
}

#[async_trait::async_trait]
impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
async fn datanode(&self, peer: &Peer) -> DatanodeRef {
Arc::new(MockDatanode {
peer: peer.clone(),
handler: self.handler.clone(),
})
}

async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
}
}

/// Returns a test purpose [DdlContext].
pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
pub fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext {
let kv_backend = Arc::new(MemoryKvBackend::new());
new_ddl_context_with_kv_backend(datanode_manager, kv_backend)
}

/// Returns a test purpose [DdlContext] with a specified [KvBackendRef].
pub fn new_ddl_context_with_kv_backend(
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
kv_backend: KvBackendRef,
) -> DdlContext {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
Expand All @@ -42,7 +42,7 @@ pub struct FrontendBuilder {
kv_backend: KvBackendRef,
cache_invalidator: Option<CacheInvalidatorRef>,
catalog_manager: CatalogManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
plugins: Option<Plugins>,
procedure_executor: ProcedureExecutorRef,
heartbeat_task: Option<HeartbeatTask>,
Expand All @@ -52,7 +52,7 @@ impl FrontendBuilder {
pub fn new(
kv_backend: KvBackendRef,
catalog_manager: CatalogManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
procedure_executor: ProcedureExecutorRef,
) -> Self {
Self {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/instance/region_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use query::error::{RegionQuerySnafu, Result as QueryResult};
Expand All @@ -29,13 +29,13 @@ use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result};

pub(crate) struct FrontendRegionQueryHandler {
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
}

impl FrontendRegionQueryHandler {
pub fn arc(
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
datanode_manager: NodeManagerRef,
) -> Arc<Self> {
Arc::new(Self {
partition_manager,
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionRespo
use async_trait::async_trait;
use client::region::check_response_header;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef};
use common_meta::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::peer::Peer;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -34,10 +34,14 @@ use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
pub struct StandaloneDatanodeManager(pub RegionServer);

#[async_trait]
impl DatanodeManager for StandaloneDatanodeManager {
impl NodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
RegionInvoker::arc(self.0.clone())
}

async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
}
}

/// Relative to [client::region::RegionRequester]
Expand Down
8 changes: 4 additions & 4 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
use common_meta::distributed_time_constants;
Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct MetasrvBuilder {
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
datanode_manager: Option<DatanodeManagerRef>,
datanode_manager: Option<NodeManagerRef>,
plugins: Option<Plugins>,
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
}
Expand Down Expand Up @@ -141,7 +141,7 @@ impl MetasrvBuilder {
self
}

pub fn datanode_manager(mut self, datanode_manager: DatanodeManagerRef) -> Self {
pub fn datanode_manager(mut self, datanode_manager: NodeManagerRef) -> Self {
self.datanode_manager = Some(datanode_manager);
self
}
Expand Down Expand Up @@ -392,7 +392,7 @@ fn build_procedure_manager(

fn build_ddl_manager(
options: &MetasrvOptions,
datanode_clients: Option<DatanodeManagerRef>,
datanode_clients: Option<NodeManagerRef>,
procedure_manager: &ProcedureManagerRef,
mailbox: &MailboxRef,
table_metadata_manager: &TableMetadataManagerRef,
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState};
use common_meta::ddl::create_table::*;
use common_meta::ddl::test_util::columns::TestColumnDefBuilder;
Expand Down Expand Up @@ -173,7 +173,7 @@ fn test_region_request_builder() {
async fn new_datanode_manager(
region_server: &EchoRegionServer,
region_routes: &[RegionRoute],
) -> DatanodeManagerRef {
) -> NodeManagerRef {
let clients = DatanodeClients::default();

let datanodes = find_leaders(region_routes);
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub mod test_data {

use chrono::DateTime;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::datanode_manager::NodeManagerRef;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::key::TableMetadataManager;
Expand Down Expand Up @@ -188,7 +188,7 @@ pub mod test_data {
}
}

pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
pub(crate) fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext {
let kv_backend = Arc::new(MemoryKvBackend::new());

let mailbox_sequence =
Expand Down
Loading

0 comments on commit eb3d2ca

Please sign in to comment.