Skip to content

Commit

Permalink
refactor: rename task suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent e69765c commit b2b0073
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 103 deletions.
2 changes: 1 addition & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow",
None => "ddl.empty",
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::{FlowMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand Down Expand Up @@ -480,7 +480,7 @@ impl StartCommand {
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowTaskMetadataManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow::FlowTaskMetadataManagerRef;
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct DdlContext {
/// Allocator for table metadata.
pub table_metadata_allocator: TableMetadataAllocatorRef,
/// Flow metadata manager.
pub flow_metadata_manager: FlowTaskMetadataManagerRef,
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
}
12 changes: 6 additions & 6 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,21 @@ impl CreateFlowProcedure {

async fn on_flownode_create_flow(&mut self) -> Result<Status> {
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
let mut create_flow = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())),
};
create_flow_task.push(async move {
create_flow.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer.clone()))
});
}

join_all(create_flow_task)
join_all(create_flow)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Expand All @@ -117,7 +117,7 @@ impl CreateFlowProcedure {
// TODO(weny): Support `or_replace`.
self.context
.flow_metadata_manager
.create_flow_task_metadata(flow_id, self.data.to_flow_task_info_value())
.create_flow_metadata(flow_id, self.data.to_flow_info_value())
.await?;
info!("Created flow metadata for flow {flow_id}");
Ok(Status::done_with_output(flow_id))
Expand All @@ -133,7 +133,7 @@ impl Procedure for CreateFlowProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;

let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
.with_label_values(&[state.as_ref()])
.start_timer();

Expand Down Expand Up @@ -225,7 +225,7 @@ impl CreateFlowTaskData {
}

/// Converts to [FlowTaskValue].
fn to_flow_task_info_value(&self) -> FlowTaskValue {
fn to_flow_info_value(&self) -> FlowTaskValue {
let CreateFlowTask {
catalog_name,
flow_name,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
//!
//! To simplify the managers used in struct fields and function parameters, we define "unify"
//! table metadata manager: [TableMetadataManager]
//! and flow metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager).
//! and flow metadata manager: [FlowTaskMetadataManager](crate::key::flow::FlowMetadataManager).
//! It contains all the managers defined above. It's recommended to just use this manager only.
//!
//! The whole picture of flow keys will be like this:
Expand Down
Loading

0 comments on commit b2b0073

Please sign in to comment.