From e52067bff2e7f46f0acaa6cfc92fd350119605a7 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 23 Jul 2024 17:39:43 +0800 Subject: [PATCH] Refactor imports in scale.rs across controller & stream --- src/meta/src/controller/scale.rs | 4 +-- src/meta/src/stream/scale.rs | 47 ++++++++------------------------ 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index cc22bde4c0468..bcb3c116462e1 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -23,8 +23,8 @@ use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment}; use risingwave_meta_model_v2::{actor, actor_dispatcher, fragment, ActorId, FragmentId, ObjectId}; use sea_orm::{ - ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, - QuerySelect, QueryTrait, RelationTrait, Statement, TransactionTrait, + ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect, + QueryTrait, RelationTrait, Statement, TransactionTrait, }; use crate::controller::catalog::CatalogController; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 796aaf475420d..04d88925b98b3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -30,19 +30,11 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; -use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; - use risingwave_common::util::value_encoding::DatumToProtoExt; -use risingwave_meta_model_v2::{ - actor, fragment, ObjectId, StreamingParallelism, StreamingParallelism, -}; -use risingwave_pb::common::{ - ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, PbParallelUnit, WorkerNode, WorkerType, -}; -use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy}; +use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism}; +use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::table_fragments::actor_status::{ActorState, PbActorState}; +use risingwave_pb::meta::table_fragments::actor_status::{ActorState}; use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, }; @@ -592,7 +584,6 @@ impl ScaleController { fragment_id, status, splits: _, - parallel_unit_id, worker_id, upstream_actor_ids, vnode_bitmap, @@ -627,16 +618,7 @@ impl ScaleController { .or_default() .push(actor_info); - actor_status.insert( - actor_id as _, - ActorStatus { - parallel_unit: Some(PbParallelUnit { - id: parallel_unit_id as _, - worker_node_id: worker_id as _, - }), - state: PbActorState::from(status) as i32, - }, - ); + actor_status.insert(actor_id as _, worker_id as WorkerId); expr_contexts.insert(actor_id as u32, expr_context); } @@ -649,7 +631,6 @@ impl ScaleController { fragment_type_mask, distribution_type, stream_node, - vnode_mapping, state_table_ids, upstream_fragment_id, }, @@ -671,7 +652,6 @@ impl ScaleController { fragment_id: fragment_id as _, fragment_type_mask: fragment_type_mask as _, distribution_type: distribution_type.into(), - vnode_mapping: Some(vnode_mapping.to_protobuf()), state_table_ids: state_table_ids.into_u32_array(), upstream_fragment_ids: upstream_fragment_id.into_u32_array(), actor_template: PbStreamActor { @@ -2122,23 +2102,20 @@ impl ScaleController { println!("table frag id map {:?}", table_fragment_id_map); for (actor_id, actor) in actors { - actor_status.insert( - actor_id as ActorId, - ActorStatus { - parallel_unit: Some(ParallelUnit { - id: actor.parallel_unit_id as ParallelUnitId, - worker_node_id: actor.worker_id as WorkerId, - }), - state: PbActorState::from(actor.status) as i32, - }, - ); + // actor_status.insert( + // actor_id as ActorId, + // ActorStatus { + // location: PbActorLocation::from_worker(actor.worker_id), + // state: PbActorState::from(actor.status) as i32, + // }, + // ); fragment_actor_id_map .entry(actor.fragment_id as FragmentId) .or_default() .insert(actor_id as ActorId); } - println!("actor status {:#?}", actor_status); + // println!("actor status {:#?}", actor_status); println!("frag actor {:#?}", fragment_actor_id_map); } }