Skip to content

Commit

Permalink
Add Copy to DistributionType, fix worker_id cast, rm Bitmap import
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 19, 2024
1 parent 738662a commit 48111f5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Model {
pub upstream_fragment_id: I32Array,
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DistributionType {
#[sea_orm(string_value = "SINGLE")]
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl StreamManagerService for StreamServiceImpl {
actor_id: actor_location.actor_id as _,
fragment_id: actor_location.fragment_id as _,
state: PbActorState::from(actor_location.status) as _,
worker_id: actor_location.worker_id,
worker_id: actor_location.worker_id as _,
})
.collect_vec()
}
Expand Down
31 changes: 20 additions & 11 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::num::NonZeroUsize;

use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, current_cluster_version};
Expand Down Expand Up @@ -65,7 +64,8 @@ use crate::controller::catalog::CatalogController;
use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
build_relation_group, check_relation_name_duplicate, check_sink_into_table_cycle,
ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings, PartialObject,
ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings,
rebuild_fragment_mapping_from_actors, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, SinkId, StreamingJob};
Expand Down Expand Up @@ -1386,7 +1386,7 @@ impl CatalogController {

let txn = inner.db.begin().await?;

let fragment_mapping_to_notify = vec![];
let mut fragment_mapping_to_notify = vec![];

// for assert only
let mut assert_dispatcher_update_checker = HashSet::new();
Expand Down Expand Up @@ -1533,15 +1533,24 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;

let fragment_actors = fragment.find_related(Actor).all(&txn).await?;
let job_actors = fragment
.find_related(Actor)
.all(&txn)
.await?
.into_iter()
.map(|actor| {
(
fragment_id,
fragment.distribution_type,
actor.actor_id,
actor.vnode_bitmap,
actor.worker_id,
actor.status,
)
})
.collect_vec();

let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment_actors.len());
for actor in &fragment_actors {
if let Some(vnode_bitmap) = &actor.vnode_bitmap {
let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf());
actor_to_vnode_bitmap.insert(actor.actor_id as u32, bitmap);
}
}
fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));

// for downstream and upstream
let removed_actor_ids: HashSet<_> = removed_actors
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::metrics::LabelGuardedIntGaugeVec;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::register_guarded_int_gauge_vec_with_registry;
use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
use risingwave_meta_model_v2::WorkerId;
use risingwave_object_store::object::object_metrics::{
ObjectStoreMetrics, GLOBAL_OBJECT_STORE_METRICS,
};
Expand Down Expand Up @@ -818,14 +819,14 @@ pub async fn refresh_fragment_info_metrics_v2(
}
};

let worker_addr_mapping: HashMap<u32, String> = worker_nodes
let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
.into_iter()
.map(|worker_node| {
let addr = match worker_node.host {
Some(host) => format!("{}:{}", host.host, host.port),
None => "".to_owned(),
};
(worker_node.id, addr)
(worker_node.id as WorkerId, addr)
})
.collect();
let table_compaction_group_id_mapping = hummock_manager
Expand All @@ -842,7 +843,7 @@ pub async fn refresh_fragment_info_metrics_v2(
let fragment_id_str = actor_location.fragment_id.to_string();
// Report a dummy gauge metrics with (fragment id, actor id, node
// address) as its label
if let Some(address) = worker_addr_mapping.get(&{ actor_location.worker_id }) {
if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
meta_metrics
.actor_info
.with_label_values(&[&actor_id_str, &fragment_id_str, address])
Expand Down

0 comments on commit 48111f5

Please sign in to comment.