From 48111f5f1fae1574a0f7e44300c40b67b0a1063d Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 17 Jul 2024 16:20:38 +0800 Subject: [PATCH] Add Copy to DistributionType, fix worker_id cast, rm Bitmap import --- src/meta/model_v2/src/fragment.rs | 2 +- src/meta/service/src/stream_service.rs | 2 +- src/meta/src/controller/streaming_job.rs | 31 +++++++++++++++--------- src/meta/src/rpc/metrics.rs | 7 +++--- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index a189cb03c7747..dd332f5fc76a7 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -31,7 +31,7 @@ pub struct Model { pub upstream_fragment_id: I32Array, } -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DistributionType { #[sea_orm(string_value = "SINGLE")] diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index bd18d617c88e0..15a2157a0642d 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -388,7 +388,7 @@ impl StreamManagerService for StreamServiceImpl { actor_id: actor_location.actor_id as _, fragment_id: actor_location.fragment_id as _, state: PbActorState::from(actor_location.status) as _, - worker_id: actor_location.worker_id, + worker_id: actor_location.worker_id as _, }) .collect_vec() } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index cd2a56982d11c..8de621abc97b1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; -use risingwave_common::bitmap::Bitmap; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; @@ -65,7 +64,8 @@ use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ build_relation_group, check_relation_name_duplicate, check_sink_into_table_cycle, - ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings, PartialObject, + ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings, + rebuild_fragment_mapping_from_actors, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; @@ -1386,7 +1386,7 @@ impl CatalogController { let txn = inner.db.begin().await?; - let fragment_mapping_to_notify = vec![]; + let mut fragment_mapping_to_notify = vec![]; // for assert only let mut assert_dispatcher_update_checker = HashSet::new(); @@ -1533,15 +1533,24 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; - let fragment_actors = fragment.find_related(Actor).all(&txn).await?; + let job_actors = fragment + .find_related(Actor) + .all(&txn) + .await? + .into_iter() + .map(|actor| { + ( + fragment_id, + fragment.distribution_type, + actor.actor_id, + actor.vnode_bitmap, + actor.worker_id, + actor.status, + ) + }) + .collect_vec(); - let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment_actors.len()); - for actor in &fragment_actors { - if let Some(vnode_bitmap) = &actor.vnode_bitmap { - let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf()); - actor_to_vnode_bitmap.insert(actor.actor_id as u32, bitmap); - } - } + fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors)); // for downstream and upstream let removed_actor_ids: HashSet<_> = removed_actors diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 754e1802ae089..6fe0df07bb359 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -29,6 +29,7 @@ use risingwave_common::metrics::LabelGuardedIntGaugeVec; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::register_guarded_int_gauge_vec_with_registry; use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics; +use risingwave_meta_model_v2::WorkerId; use risingwave_object_store::object::object_metrics::{ ObjectStoreMetrics, GLOBAL_OBJECT_STORE_METRICS, }; @@ -818,14 +819,14 @@ pub async fn refresh_fragment_info_metrics_v2( } }; - let worker_addr_mapping: HashMap = worker_nodes + let worker_addr_mapping: HashMap = worker_nodes .into_iter() .map(|worker_node| { let addr = match worker_node.host { Some(host) => format!("{}:{}", host.host, host.port), None => "".to_owned(), }; - (worker_node.id, addr) + (worker_node.id as WorkerId, addr) }) .collect(); let table_compaction_group_id_mapping = hummock_manager @@ -842,7 +843,7 @@ pub async fn refresh_fragment_info_metrics_v2( let fragment_id_str = actor_location.fragment_id.to_string(); // Report a dummy gauge metrics with (fragment id, actor id, node // address) as its label - if let Some(address) = worker_addr_mapping.get(&{ actor_location.worker_id }) { + if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) { meta_metrics .actor_info .with_label_values(&[&actor_id_str, &fragment_id_str, address])