Skip to content

Commit

Permalink
chore: rename XxxPb to PbXxx (#17534)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jul 2, 2024
1 parent 7b26b20 commit 65331e6
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 66 deletions.
22 changes: 11 additions & 11 deletions src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_common::array::{DataChunk, RowRef};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::plan_common::JoinType as JoinTypePb;
use risingwave_pb::plan_common::JoinType as PbJoinType;

use crate::error::Result;

Expand All @@ -52,17 +52,17 @@ pub enum JoinType {
}

impl JoinType {
pub fn from_prost(prost: JoinTypePb) -> Self {
pub fn from_prost(prost: PbJoinType) -> Self {
match prost {
JoinTypePb::Inner => JoinType::Inner,
JoinTypePb::LeftOuter => JoinType::LeftOuter,
JoinTypePb::LeftSemi => JoinType::LeftSemi,
JoinTypePb::LeftAnti => JoinType::LeftAnti,
JoinTypePb::RightOuter => JoinType::RightOuter,
JoinTypePb::RightSemi => JoinType::RightSemi,
JoinTypePb::RightAnti => JoinType::RightAnti,
JoinTypePb::FullOuter => JoinType::FullOuter,
JoinTypePb::Unspecified => unreachable!(),
PbJoinType::Inner => JoinType::Inner,
PbJoinType::LeftOuter => JoinType::LeftOuter,
PbJoinType::LeftSemi => JoinType::LeftSemi,
PbJoinType::LeftAnti => JoinType::LeftAnti,
PbJoinType::RightOuter => JoinType::RightOuter,
PbJoinType::RightSemi => JoinType::RightSemi,
PbJoinType::RightAnti => JoinType::RightAnti,
PbJoinType::FullOuter => JoinType::FullOuter,
PbJoinType::Unspecified => unreachable!(),
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
use std::ops::{Bound, RangeBounds};

use paste::paste;
use risingwave_pb::batch_plan::scan_range::Bound as BoundPb;
use risingwave_pb::batch_plan::ScanRange as ScanRangePb;
use risingwave_pb::batch_plan::scan_range::Bound as PbBound;
use risingwave_pb::batch_plan::ScanRange as PbScanRange;

use super::value_encoding::serialize_datum;
use crate::hash::table_distribution::TableDistribution;
use crate::hash::VirtualNode;
use crate::types::{Datum, ScalarImpl};
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangePb`]
/// See also [`PbScanRange`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
}

fn bound_to_proto(bound: &Bound<ScalarImpl>) -> Option<BoundPb> {
fn bound_to_proto(bound: &Bound<ScalarImpl>) -> Option<PbBound> {
match bound {
Bound::Included(literal) => Some(BoundPb {
Bound::Included(literal) => Some(PbBound {
value: serialize_datum(Some(literal)),
inclusive: true,
}),
Bound::Excluded(literal) => Some(BoundPb {
Bound::Excluded(literal) => Some(PbBound {
value: serialize_datum(Some(literal)),
inclusive: false,
}),
Expand All @@ -46,8 +46,8 @@ fn bound_to_proto(bound: &Bound<ScalarImpl>) -> Option<BoundPb> {
}

impl ScanRange {
pub fn to_protobuf(&self) -> ScanRangePb {
ScanRangePb {
pub fn to_protobuf(&self) -> PbScanRange {
PbScanRange {
eq_conds: self
.eq_conds
.iter()
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_expr::aggregate::{agg_kinds, AggKind};
use risingwave_expr::sig::{FuncBuilder, FUNCTION_REGISTRY};
use risingwave_pb::expr::{PbAggCall, PbConstant};
use risingwave_pb::stream_plan::{agg_call_state, AggCallState as AggCallStatePb};
use risingwave_pb::stream_plan::{agg_call_state, AggCallState as PbAggCallState};

use super::super::utils::TableCatalogBuilder;
use super::{impl_distill_unit_from_fields, stream, GenericPlanNode, GenericPlanRef};
Expand Down Expand Up @@ -229,8 +229,8 @@ pub enum AggCallState {
}

impl AggCallState {
pub fn into_prost(self, state: &mut BuildFragmentGraphState) -> AggCallStatePb {
AggCallStatePb {
pub fn into_prost(self, state: &mut BuildFragmentGraphState) -> PbAggCallState {
PbAggCallState {
inner: Some(match self {
AggCallState::Value => {
agg_call_state::Inner::ValueState(agg_call_state::ValueState {})
Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use paste::paste;
use pretty_xmlish::{Pretty, PrettyConfig};
use risingwave_common::catalog::Schema;
use risingwave_common::util::recursive::{self, Recurse};
use risingwave_pb::batch_plan::PlanNode as BatchPlanPb;
use risingwave_pb::stream_plan::StreamNode as StreamPlanPb;
use risingwave_pb::batch_plan::PlanNode as PbBatchPlan;
use risingwave_pb::stream_plan::StreamNode as PbStreamPlan;
use serde::Serialize;
use smallvec::SmallVec;

Expand Down Expand Up @@ -710,7 +710,7 @@ impl dyn PlanNode {
pub fn to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<StreamPlanPb> {
) -> SchedulerResult<PbStreamPlan> {
recursive::tracker!().recurse(|t| {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
Expand Down Expand Up @@ -738,7 +738,7 @@ impl dyn PlanNode {
.map(|plan| plan.to_stream_prost(state))
.try_collect()?;
// TODO: support pk_indices and operator_id
Ok(StreamPlanPb {
Ok(PbStreamPlan {
input,
identity: self.explain_myself_to_string(),
node_body: node,
Expand All @@ -756,13 +756,13 @@ impl dyn PlanNode {
}

/// Serialize the plan node and its children to a batch plan proto.
pub fn to_batch_prost(&self) -> SchedulerResult<BatchPlanPb> {
pub fn to_batch_prost(&self) -> SchedulerResult<PbBatchPlan> {
self.to_batch_prost_identity(true)
}

/// Serialize the plan node and its children to a batch plan proto without the identity field
/// (for testing).
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<BatchPlanPb> {
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<PbBatchPlan> {
recursive::tracker!().recurse(|t| {
if t.depth_reaches(PLAN_DEPTH_THRESHOLD) {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
Expand All @@ -774,7 +774,7 @@ impl dyn PlanNode {
.into_iter()
.map(|plan| plan.to_batch_prost_identity(identity))
.try_collect()?;
Ok(BatchPlanPb {
Ok(PbBatchPlan {
children,
identity: if identity {
self.explain_myself_to_string()
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::catalog::{FieldDisplay, Schema, TableId};
use risingwave_common::hash::WorkerSlotId;
use risingwave_pb::batch_plan::exchange_info::{
ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo,
ConsistentHashInfo, Distribution as PbDistribution, DistributionMode, HashInfo,
};
use risingwave_pb::batch_plan::ExchangeInfo;

Expand Down Expand Up @@ -131,7 +131,7 @@ impl Distribution {
!key.is_empty(),
"hash key should not be empty, use `Single` instead"
);
Some(DistributionPb::HashInfo(HashInfo {
Some(PbDistribution::HashInfo(HashInfo {
output_count,
key: key.iter().map(|num| *num as u32).collect(),
}))
Expand All @@ -154,7 +154,7 @@ impl Distribution {
.map(|(i, worker_slot_id)| (worker_slot_id, i as u32))
.collect();

Some(DistributionPb::ConsistentHashInfo(ConsistentHashInfo {
Some(PbDistribution::ConsistentHashInfo(ConsistentHashInfo {
vmap: vnode_mapping
.iter()
.map(|id| worker_slot_to_id_map[&id])
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use petgraph::Graph;
use pgwire::pg_server::SessionId;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::array::DataChunk;
use risingwave_pb::batch_plan::{TaskId as TaskIdPb, TaskOutputId as TaskOutputIdPb};
use risingwave_pb::batch_plan::{TaskId as PbTaskId, TaskOutputId as PbTaskOutputId};
use risingwave_pb::common::HostAddress;
use risingwave_rpc_client::ComputeClientPoolRef;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -389,13 +389,13 @@ impl QueryRunner {
/// of shutdown sender so that shutdown receiver won't be triggered.
fn send_root_stage_info(&mut self, chunk_rx: Receiver<SchedulerResult<DataChunk>>) {
let root_task_output_id = {
let root_task_id_prost = TaskIdPb {
let root_task_id_prost = PbTaskId {
query_id: self.query.query_id.clone().id,
stage_id: self.query.root_stage_id(),
task_id: ROOT_TASK_ID,
};

TaskOutputIdPb {
PbTaskOutputId {
task_id: Some(root_task_id_prost),
output_id: ROOT_TASK_OUTPUT_ID,
}
Expand Down
40 changes: 20 additions & 20 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use risingwave_expr::expr_context::expr_context_scope;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{
DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment,
PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, TaskOutputId,
PlanNode as PbPlanNode, PlanNode, TaskId as PbTaskId, TaskOutputId,
};
use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode};
use risingwave_pb::plan_common::ExprContext;
Expand Down Expand Up @@ -294,7 +294,7 @@ impl StageExecution {
.iter()
.map(|(task_id, status_holder)| {
let task_output_id = TaskOutputId {
task_id: Some(TaskIdPb {
task_id: Some(PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: *task_id,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl StageRunner {
.zip_eq_fast(workers.into_iter())
.enumerate()
{
let task_id = TaskIdPb {
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: i as u64,
Expand All @@ -389,7 +389,7 @@ impl StageRunner {
.chunks(chunk_size)
.enumerate()
{
let task_id = TaskIdPb {
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: id as u64,
Expand All @@ -407,7 +407,7 @@ impl StageRunner {
}
} else {
for id in 0..self.stage.parallelism.unwrap() {
let task_id = TaskIdPb {
let task_id = PbTaskId {
query_id: self.stage.query_id.id.clone(),
stage_id: self.stage.id,
task_id: id as u64,
Expand Down Expand Up @@ -439,9 +439,9 @@ impl StageRunner {
while let Some(status_res_inner) = all_streams.next().await {
match status_res_inner {
Ok(status) => {
use risingwave_pb::task_service::task_info_response::TaskStatus as TaskStatusPb;
match TaskStatusPb::try_from(status.task_status).unwrap() {
TaskStatusPb::Running => {
use risingwave_pb::task_service::task_info_response::TaskStatus as PbTaskStatus;
match PbTaskStatus::try_from(status.task_status).unwrap() {
PbTaskStatus::Running => {
running_task_cnt += 1;
// The task running count should always less or equal than the
// registered tasks number.
Expand All @@ -457,7 +457,7 @@ impl StageRunner {
}
}

TaskStatusPb::Finished => {
PbTaskStatus::Finished => {
finished_task_cnt += 1;
assert!(finished_task_cnt <= self.tasks.keys().len());
assert!(running_task_cnt >= finished_task_cnt);
Expand All @@ -469,7 +469,7 @@ impl StageRunner {
break;
}
}
TaskStatusPb::Aborted => {
PbTaskStatus::Aborted => {
// Currently, the only reason that we receive an abort status is that
// the task's memory usage is too high so
// it's aborted.
Expand All @@ -488,7 +488,7 @@ impl StageRunner {
sent_signal_to_next = true;
break;
}
TaskStatusPb::Failed => {
PbTaskStatus::Failed => {
// Task failed, we should fail whole query
error!(
"Task {:?} failed, reason: {:?}",
Expand All @@ -506,7 +506,7 @@ impl StageRunner {
sent_signal_to_next = true;
break;
}
TaskStatusPb::Ping => {
PbTaskStatus::Ping => {
debug!("Receive ping from task {:?}", status.task_id.unwrap());
}
status => {
Expand Down Expand Up @@ -870,7 +870,7 @@ impl StageRunner {

async fn schedule_task(
&self,
task_id: TaskIdPb,
task_id: PbTaskId,
plan_fragment: PlanFragment,
worker: Option<WorkerNode>,
expr_context: ExprContext,
Expand Down Expand Up @@ -925,7 +925,7 @@ impl StageRunner {
task_id: TaskId,
partition: Option<PartitionInfo>,
identity_id: Rc<RefCell<u64>>,
) -> PlanNodePb {
) -> PbPlanNode {
// Generate identity
let identity = {
let identity_type = execution_plan_node.plan_node_type;
Expand All @@ -947,7 +947,7 @@ impl StageRunner {
let exchange_sources = child_stage.all_exchange_sources_for(task_id);

match &execution_plan_node.node {
NodeBody::Exchange(exchange_node) => PlanNodePb {
NodeBody::Exchange(exchange_node) => PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::Exchange(ExchangeNode {
Expand All @@ -956,7 +956,7 @@ impl StageRunner {
input_schema: execution_plan_node.schema.clone(),
})),
},
NodeBody::MergeSortExchange(sort_merge_exchange_node) => PlanNodePb {
NodeBody::MergeSortExchange(sort_merge_exchange_node) => PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode {
Expand All @@ -982,7 +982,7 @@ impl StageRunner {
.expect("PartitionInfo should be TablePartitionInfo");
scan_node.vnode_bitmap = Some(partition.vnode_bitmap);
scan_node.scan_ranges = partition.scan_ranges;
PlanNodePb {
PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::RowSeqScan(scan_node)),
Expand All @@ -998,7 +998,7 @@ impl StageRunner {
.into_table()
.expect("PartitionInfo should be TablePartitionInfo");
scan_node.vnode_bitmap = Some(partition.vnode_bitmap);
PlanNodePb {
PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::LogRowSeqScan(scan_node)),
Expand All @@ -1020,7 +1020,7 @@ impl StageRunner {
.into_iter()
.map(|split| split.encode_to_bytes().into())
.collect_vec();
PlanNodePb {
PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::Source(source_node)),
Expand All @@ -1035,7 +1035,7 @@ impl StageRunner {
})
.collect();

PlanNodePb {
PbPlanNode {
children,
identity,
node_body: Some(execution_plan_node.node.clone()),
Expand Down
Loading

0 comments on commit 65331e6

Please sign in to comment.