diff --git a/src/batch/src/executor/join/mod.rs b/src/batch/src/executor/join/mod.rs index 6cead60aee0b4..77210e3e6e25a 100644 --- a/src/batch/src/executor/join/mod.rs +++ b/src/batch/src/executor/join/mod.rs @@ -30,7 +30,7 @@ use risingwave_common::array::{DataChunk, RowRef}; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_pb::plan_common::JoinType as JoinTypePb; +use risingwave_pb::plan_common::JoinType as PbJoinType; use crate::error::Result; @@ -52,17 +52,17 @@ pub enum JoinType { } impl JoinType { - pub fn from_prost(prost: JoinTypePb) -> Self { + pub fn from_prost(prost: PbJoinType) -> Self { match prost { - JoinTypePb::Inner => JoinType::Inner, - JoinTypePb::LeftOuter => JoinType::LeftOuter, - JoinTypePb::LeftSemi => JoinType::LeftSemi, - JoinTypePb::LeftAnti => JoinType::LeftAnti, - JoinTypePb::RightOuter => JoinType::RightOuter, - JoinTypePb::RightSemi => JoinType::RightSemi, - JoinTypePb::RightAnti => JoinType::RightAnti, - JoinTypePb::FullOuter => JoinType::FullOuter, - JoinTypePb::Unspecified => unreachable!(), + PbJoinType::Inner => JoinType::Inner, + PbJoinType::LeftOuter => JoinType::LeftOuter, + PbJoinType::LeftSemi => JoinType::LeftSemi, + PbJoinType::LeftAnti => JoinType::LeftAnti, + PbJoinType::RightOuter => JoinType::RightOuter, + PbJoinType::RightSemi => JoinType::RightSemi, + PbJoinType::RightAnti => JoinType::RightAnti, + PbJoinType::FullOuter => JoinType::FullOuter, + PbJoinType::Unspecified => unreachable!(), } } } diff --git a/src/common/src/util/scan_range.rs b/src/common/src/util/scan_range.rs index 19e0cb50b83c6..fd056f1790444 100644 --- a/src/common/src/util/scan_range.rs +++ b/src/common/src/util/scan_range.rs @@ -15,8 +15,8 @@ use std::ops::{Bound, RangeBounds}; use paste::paste; -use risingwave_pb::batch_plan::scan_range::Bound as BoundPb; -use risingwave_pb::batch_plan::ScanRange as ScanRangePb; +use risingwave_pb::batch_plan::scan_range::Bound as PbBound; +use risingwave_pb::batch_plan::ScanRange as PbScanRange; use super::value_encoding::serialize_datum; use crate::hash::table_distribution::TableDistribution; @@ -24,20 +24,20 @@ use crate::hash::VirtualNode; use crate::types::{Datum, ScalarImpl}; use crate::util::value_encoding::serialize_datum_into; -/// See also [`ScanRangePb`] +/// See also [`PbScanRange`] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ScanRange { pub eq_conds: Vec, pub range: (Bound, Bound), } -fn bound_to_proto(bound: &Bound) -> Option { +fn bound_to_proto(bound: &Bound) -> Option { match bound { - Bound::Included(literal) => Some(BoundPb { + Bound::Included(literal) => Some(PbBound { value: serialize_datum(Some(literal)), inclusive: true, }), - Bound::Excluded(literal) => Some(BoundPb { + Bound::Excluded(literal) => Some(PbBound { value: serialize_datum(Some(literal)), inclusive: false, }), @@ -46,8 +46,8 @@ fn bound_to_proto(bound: &Bound) -> Option { } impl ScanRange { - pub fn to_protobuf(&self) -> ScanRangePb { - ScanRangePb { + pub fn to_protobuf(&self) -> PbScanRange { + PbScanRange { eq_conds: self .eq_conds .iter() diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 07febf4d9b4c5..8f999988c4824 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -27,7 +27,7 @@ use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use risingwave_expr::sig::{FuncBuilder, FUNCTION_REGISTRY}; use risingwave_pb::expr::{PbAggCall, PbConstant}; -use risingwave_pb::stream_plan::{agg_call_state, AggCallState as AggCallStatePb}; +use risingwave_pb::stream_plan::{agg_call_state, AggCallState as PbAggCallState}; use super::super::utils::TableCatalogBuilder; use super::{impl_distill_unit_from_fields, stream, GenericPlanNode, GenericPlanRef}; @@ -229,8 +229,8 @@ pub enum AggCallState { } impl AggCallState { - pub fn into_prost(self, state: &mut BuildFragmentGraphState) -> AggCallStatePb { - AggCallStatePb { + pub fn into_prost(self, state: &mut BuildFragmentGraphState) -> PbAggCallState { + PbAggCallState { inner: Some(match self { AggCallState::Value => { agg_call_state::Inner::ValueState(agg_call_state::ValueState {}) diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 295afd0762167..71c4c44fac8ba 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -40,8 +40,8 @@ use paste::paste; use pretty_xmlish::{Pretty, PrettyConfig}; use risingwave_common::catalog::Schema; use risingwave_common::util::recursive::{self, Recurse}; -use risingwave_pb::batch_plan::PlanNode as BatchPlanPb; -use risingwave_pb::stream_plan::StreamNode as StreamPlanPb; +use risingwave_pb::batch_plan::PlanNode as PbBatchPlan; +use risingwave_pb::stream_plan::StreamNode as PbStreamPlan; use serde::Serialize; use smallvec::SmallVec; @@ -710,7 +710,7 @@ impl dyn PlanNode { pub fn to_stream_prost( &self, state: &mut BuildFragmentGraphState, - ) -> SchedulerResult { + ) -> SchedulerResult { recursive::tracker!().recurse(|t| { if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { notice_to_user(PLAN_TOO_DEEP_NOTICE); @@ -738,7 +738,7 @@ impl dyn PlanNode { .map(|plan| plan.to_stream_prost(state)) .try_collect()?; // TODO: support pk_indices and operator_id - Ok(StreamPlanPb { + Ok(PbStreamPlan { input, identity: self.explain_myself_to_string(), node_body: node, @@ -756,13 +756,13 @@ impl dyn PlanNode { } /// Serialize the plan node and its children to a batch plan proto. - pub fn to_batch_prost(&self) -> SchedulerResult { + pub fn to_batch_prost(&self) -> SchedulerResult { self.to_batch_prost_identity(true) } /// Serialize the plan node and its children to a batch plan proto without the identity field /// (for testing). - pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { + pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { recursive::tracker!().recurse(|t| { if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { notice_to_user(PLAN_TOO_DEEP_NOTICE); @@ -774,7 +774,7 @@ impl dyn PlanNode { .into_iter() .map(|plan| plan.to_batch_prost_identity(identity)) .try_collect()?; - Ok(BatchPlanPb { + Ok(PbBatchPlan { children, identity: if identity { self.explain_myself_to_string() diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index cf02daac47d83..c7faceafbe89a 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -53,7 +53,7 @@ use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; use risingwave_common::hash::WorkerSlotId; use risingwave_pb::batch_plan::exchange_info::{ - ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo, + ConsistentHashInfo, Distribution as PbDistribution, DistributionMode, HashInfo, }; use risingwave_pb::batch_plan::ExchangeInfo; @@ -131,7 +131,7 @@ impl Distribution { !key.is_empty(), "hash key should not be empty, use `Single` instead" ); - Some(DistributionPb::HashInfo(HashInfo { + Some(PbDistribution::HashInfo(HashInfo { output_count, key: key.iter().map(|num| *num as u32).collect(), })) @@ -154,7 +154,7 @@ impl Distribution { .map(|(i, worker_slot_id)| (worker_slot_id, i as u32)) .collect(); - Some(DistributionPb::ConsistentHashInfo(ConsistentHashInfo { + Some(PbDistribution::ConsistentHashInfo(ConsistentHashInfo { vmap: vnode_mapping .iter() .map(|id| worker_slot_to_id_map[&id]) diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 165bdcee6476b..5ab5f1aa85af7 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -24,7 +24,7 @@ use petgraph::Graph; use pgwire::pg_server::SessionId; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; -use risingwave_pb::batch_plan::{TaskId as TaskIdPb, TaskOutputId as TaskOutputIdPb}; +use risingwave_pb::batch_plan::{TaskId as PbTaskId, TaskOutputId as PbTaskOutputId}; use risingwave_pb::common::HostAddress; use risingwave_rpc_client::ComputeClientPoolRef; use thiserror_ext::AsReport; @@ -389,13 +389,13 @@ impl QueryRunner { /// of shutdown sender so that shutdown receiver won't be triggered. fn send_root_stage_info(&mut self, chunk_rx: Receiver>) { let root_task_output_id = { - let root_task_id_prost = TaskIdPb { + let root_task_id_prost = PbTaskId { query_id: self.query.query_id.clone().id, stage_id: self.query.root_stage_id(), task_id: ROOT_TASK_ID, }; - TaskOutputIdPb { + PbTaskOutputId { task_id: Some(root_task_id_prost), output_id: ROOT_TASK_OUTPUT_ID, } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 8b7e07a0aefcd..9ee0e7d27ceb5 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -40,7 +40,7 @@ use risingwave_expr::expr_context::expr_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment, - PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, TaskOutputId, + PlanNode as PbPlanNode, PlanNode, TaskId as PbTaskId, TaskOutputId, }; use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::plan_common::ExprContext; @@ -294,7 +294,7 @@ impl StageExecution { .iter() .map(|(task_id, status_holder)| { let task_output_id = TaskOutputId { - task_id: Some(TaskIdPb { + task_id: Some(PbTaskId { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, task_id: *task_id, @@ -364,7 +364,7 @@ impl StageRunner { .zip_eq_fast(workers.into_iter()) .enumerate() { - let task_id = TaskIdPb { + let task_id = PbTaskId { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, task_id: i as u64, @@ -389,7 +389,7 @@ impl StageRunner { .chunks(chunk_size) .enumerate() { - let task_id = TaskIdPb { + let task_id = PbTaskId { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, task_id: id as u64, @@ -407,7 +407,7 @@ impl StageRunner { } } else { for id in 0..self.stage.parallelism.unwrap() { - let task_id = TaskIdPb { + let task_id = PbTaskId { query_id: self.stage.query_id.id.clone(), stage_id: self.stage.id, task_id: id as u64, @@ -439,9 +439,9 @@ impl StageRunner { while let Some(status_res_inner) = all_streams.next().await { match status_res_inner { Ok(status) => { - use risingwave_pb::task_service::task_info_response::TaskStatus as TaskStatusPb; - match TaskStatusPb::try_from(status.task_status).unwrap() { - TaskStatusPb::Running => { + use risingwave_pb::task_service::task_info_response::TaskStatus as PbTaskStatus; + match PbTaskStatus::try_from(status.task_status).unwrap() { + PbTaskStatus::Running => { running_task_cnt += 1; // The task running count should always less or equal than the // registered tasks number. @@ -457,7 +457,7 @@ impl StageRunner { } } - TaskStatusPb::Finished => { + PbTaskStatus::Finished => { finished_task_cnt += 1; assert!(finished_task_cnt <= self.tasks.keys().len()); assert!(running_task_cnt >= finished_task_cnt); @@ -469,7 +469,7 @@ impl StageRunner { break; } } - TaskStatusPb::Aborted => { + PbTaskStatus::Aborted => { // Currently, the only reason that we receive an abort status is that // the task's memory usage is too high so // it's aborted. @@ -488,7 +488,7 @@ impl StageRunner { sent_signal_to_next = true; break; } - TaskStatusPb::Failed => { + PbTaskStatus::Failed => { // Task failed, we should fail whole query error!( "Task {:?} failed, reason: {:?}", @@ -506,7 +506,7 @@ impl StageRunner { sent_signal_to_next = true; break; } - TaskStatusPb::Ping => { + PbTaskStatus::Ping => { debug!("Receive ping from task {:?}", status.task_id.unwrap()); } status => { @@ -870,7 +870,7 @@ impl StageRunner { async fn schedule_task( &self, - task_id: TaskIdPb, + task_id: PbTaskId, plan_fragment: PlanFragment, worker: Option, expr_context: ExprContext, @@ -925,7 +925,7 @@ impl StageRunner { task_id: TaskId, partition: Option, identity_id: Rc>, - ) -> PlanNodePb { + ) -> PbPlanNode { // Generate identity let identity = { let identity_type = execution_plan_node.plan_node_type; @@ -947,7 +947,7 @@ impl StageRunner { let exchange_sources = child_stage.all_exchange_sources_for(task_id); match &execution_plan_node.node { - NodeBody::Exchange(exchange_node) => PlanNodePb { + NodeBody::Exchange(exchange_node) => PbPlanNode { children: vec![], identity, node_body: Some(NodeBody::Exchange(ExchangeNode { @@ -956,7 +956,7 @@ impl StageRunner { input_schema: execution_plan_node.schema.clone(), })), }, - NodeBody::MergeSortExchange(sort_merge_exchange_node) => PlanNodePb { + NodeBody::MergeSortExchange(sort_merge_exchange_node) => PbPlanNode { children: vec![], identity, node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode { @@ -982,7 +982,7 @@ impl StageRunner { .expect("PartitionInfo should be TablePartitionInfo"); scan_node.vnode_bitmap = Some(partition.vnode_bitmap); scan_node.scan_ranges = partition.scan_ranges; - PlanNodePb { + PbPlanNode { children: vec![], identity, node_body: Some(NodeBody::RowSeqScan(scan_node)), @@ -998,7 +998,7 @@ impl StageRunner { .into_table() .expect("PartitionInfo should be TablePartitionInfo"); scan_node.vnode_bitmap = Some(partition.vnode_bitmap); - PlanNodePb { + PbPlanNode { children: vec![], identity, node_body: Some(NodeBody::LogRowSeqScan(scan_node)), @@ -1020,7 +1020,7 @@ impl StageRunner { .into_iter() .map(|split| split.encode_to_bytes().into()) .collect_vec(); - PlanNodePb { + PbPlanNode { children: vec![], identity, node_body: Some(NodeBody::Source(source_node)), @@ -1035,7 +1035,7 @@ impl StageRunner { }) .collect(); - PlanNodePb { + PbPlanNode { children, identity, node_body: Some(execution_plan_node.node.clone()), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 89104cc895f77..30ab213c4ace4 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -38,7 +38,7 @@ use risingwave_pb::batch_plan::exchange_info::DistributionMode; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ - ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PlanNodePb, + ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode, TaskOutputId, }; use risingwave_pb::common::WorkerNode; @@ -274,7 +274,7 @@ impl LocalQueryExecution { second_stages: &mut Option>, partition: Option, next_executor_id: Arc, - ) -> SchedulerResult { + ) -> SchedulerResult { let identity = format!( "{:?}-{}", execution_plan_node.plan_node_type, @@ -443,7 +443,7 @@ impl LocalQueryExecution { .collect(); } - Ok(PlanNodePb { + Ok(PbPlanNode { // Since all the rest plan is embedded into the exchange node, // there is no children any more. children: vec![], @@ -467,7 +467,7 @@ impl LocalQueryExecution { _ => unreachable!(), } - Ok(PlanNodePb { + Ok(PbPlanNode { children: vec![], identity, node_body: Some(node_body), @@ -487,7 +487,7 @@ impl LocalQueryExecution { _ => unreachable!(), } - Ok(PlanNodePb { + Ok(PbPlanNode { children: vec![], identity, node_body: Some(node_body), @@ -512,7 +512,7 @@ impl LocalQueryExecution { _ => unreachable!(), } - Ok(PlanNodePb { + Ok(PbPlanNode { children: vec![], identity, node_body: Some(node_body), @@ -545,7 +545,7 @@ impl LocalQueryExecution { next_executor_id, )?; - Ok(PlanNodePb { + Ok(PbPlanNode { children: vec![left_child], identity, node_body: Some(node_body), @@ -563,9 +563,9 @@ impl LocalQueryExecution { next_executor_id.clone(), ) }) - .collect::>>()?; + .collect::>>()?; - Ok(PlanNodePb { + Ok(PbPlanNode { children, identity, node_body: Some(execution_plan_node.node.clone()), diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 74ade3fdab836..2f17482b1e1a5 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -43,7 +43,7 @@ use risingwave_connector::source::{ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; -use risingwave_pb::plan_common::Field as FieldPb; +use risingwave_pb::plan_common::Field as PbField; use risingwave_sqlparser::ast::AsOf; use serde::ser::SerializeStruct; use serde::Serialize; @@ -86,7 +86,7 @@ pub struct ExecutionPlanNode { pub plan_node_id: PlanNodeId, pub plan_node_type: PlanNodeType, pub node: NodeBody, - pub schema: Vec, + pub schema: Vec, pub children: Vec>,