Skip to content

Commit

Permalink
Refactor imports in scale.rs across controller & stream
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 23, 2024
1 parent b0b5204 commit e52067b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use risingwave_meta_model_v2::actor_dispatcher::DispatcherType;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment};
use risingwave_meta_model_v2::{actor, actor_dispatcher, fragment, ActorId, FragmentId, ObjectId};
use sea_orm::{
ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter,
QuerySelect, QueryTrait, RelationTrait, Statement, TransactionTrait,
ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect,
QueryTrait, RelationTrait, Statement, TransactionTrait,
};

use crate::controller::catalog::CatalogController;
Expand Down
47 changes: 12 additions & 35 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,11 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType};

use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_meta_model_v2::{
actor, fragment, ObjectId, StreamingParallelism, StreamingParallelism,
};
use risingwave_pb::common::{
ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, PbParallelUnit, WorkerNode, WorkerType,
};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism};
use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::{ActorState, PbActorState};
use risingwave_pb::meta::table_fragments::actor_status::{ActorState};
use risingwave_pb::meta::table_fragments::fragment::{
FragmentDistributionType, PbFragmentDistributionType,
};
Expand Down Expand Up @@ -592,7 +584,6 @@ impl ScaleController {
fragment_id,
status,
splits: _,
parallel_unit_id,
worker_id,
upstream_actor_ids,
vnode_bitmap,
Expand Down Expand Up @@ -627,16 +618,7 @@ impl ScaleController {
.or_default()
.push(actor_info);

actor_status.insert(
actor_id as _,
ActorStatus {
parallel_unit: Some(PbParallelUnit {
id: parallel_unit_id as _,
worker_node_id: worker_id as _,
}),
state: PbActorState::from(status) as i32,
},
);
actor_status.insert(actor_id as _, worker_id as WorkerId);

expr_contexts.insert(actor_id as u32, expr_context);
}
Expand All @@ -649,7 +631,6 @@ impl ScaleController {
fragment_type_mask,
distribution_type,
stream_node,
vnode_mapping,
state_table_ids,
upstream_fragment_id,
},
Expand All @@ -671,7 +652,6 @@ impl ScaleController {
fragment_id: fragment_id as _,
fragment_type_mask: fragment_type_mask as _,
distribution_type: distribution_type.into(),
vnode_mapping: Some(vnode_mapping.to_protobuf()),
state_table_ids: state_table_ids.into_u32_array(),
upstream_fragment_ids: upstream_fragment_id.into_u32_array(),
actor_template: PbStreamActor {
Expand Down Expand Up @@ -2122,23 +2102,20 @@ impl ScaleController {
println!("table frag id map {:?}", table_fragment_id_map);

for (actor_id, actor) in actors {
actor_status.insert(
actor_id as ActorId,
ActorStatus {
parallel_unit: Some(ParallelUnit {
id: actor.parallel_unit_id as ParallelUnitId,
worker_node_id: actor.worker_id as WorkerId,
}),
state: PbActorState::from(actor.status) as i32,
},
);
// actor_status.insert(
// actor_id as ActorId,
// ActorStatus {
// location: PbActorLocation::from_worker(actor.worker_id),
// state: PbActorState::from(actor.status) as i32,
// },
// );
fragment_actor_id_map
.entry(actor.fragment_id as FragmentId)
.or_default()
.insert(actor_id as ActorId);
}

println!("actor status {:#?}", actor_status);
// println!("actor status {:#?}", actor_status);
println!("frag actor {:#?}", fragment_actor_id_map);
}
}
Expand Down

0 comments on commit e52067b

Please sign in to comment.