Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_merge_metrics
  • Loading branch information
Li0k committed Sep 18, 2024
2 parents 5274d34 + ff479f6 commit 1c044be
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 403 deletions.
208 changes: 112 additions & 96 deletions dashboard/package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"fabric": "^5.2.1",
"framer-motion": "^6.5.1",
"lodash": "^4.17.21",
"next": "^14.1.1",
"next": "^14.2.12",
"nuqs": "^1.14.1",
"react": "^18.2.0",
"react-dom": "^18.2.0",
Expand Down
25 changes: 8 additions & 17 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,6 @@ message GroupConstruct {
CompatibilityVersion version = 6;
}

message GroupMetaChange {
option deprecated = true;
repeated uint32 table_ids_add = 1 [deprecated = true];
repeated uint32 table_ids_remove = 2 [deprecated = true];
}

message GroupTableChange {
option deprecated = true;
repeated uint32 table_ids = 1 [deprecated = true];
uint64 target_group_id = 2;
uint64 origin_group_id = 3;
uint64 new_sst_start_id = 4;
CompatibilityVersion version = 5;
}

message GroupDestroy {}

message GroupMerge {
Expand All @@ -110,12 +95,14 @@ message GroupMerge {
}

message GroupDelta {
reserved 4;
reserved "group_meta_change";
reserved 5;
reserved "group_table_change";
oneof delta_type {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4 [deprecated = true];
GroupTableChange group_table_change = 5 [deprecated = true];
GroupMerge group_merge = 6;
}
}
Expand Down Expand Up @@ -358,6 +345,10 @@ message CompactTask {
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
HEARTBEAT_PROGRESS_CANCELED = 14;

// for serverless compaction
SERVERLESS_SEND_FAIL_CANCELED = 15;
SERVERLESS_TABLE_NOT_FOUND_CANCELED = 16;
}
// SSTs to be compacted, which will be removed from LSM after compaction
repeated InputLevel input_ssts = 1;
Expand Down
14 changes: 5 additions & 9 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,18 @@ message AsOfJoinNode {
catalog.Table left_table = 4;
// Used for internal table states.
catalog.Table right_table = 5;
// Used for internal table states.
catalog.Table left_degree_table = 6;
// Used for internal table states.
catalog.Table right_degree_table = 7;
// The output indices of current node
repeated uint32 output_indices = 8;
repeated uint32 output_indices = 6;
// Left deduped input pk indices. The pk of the left_table and
// The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices]
// left_inequality_key is not used but for forward compatibility.
repeated uint32 left_deduped_input_pk_indices = 9;
repeated uint32 left_deduped_input_pk_indices = 7;
// Right deduped input pk indices.
// The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices]
// right_inequality_key is not used but for forward compatibility.
repeated uint32 right_deduped_input_pk_indices = 10;
repeated bool null_safe = 11;
optional plan_common.AsOfJoinDesc asof_desc = 12;
repeated uint32 right_deduped_input_pk_indices = 8;
repeated bool null_safe = 9;
optional plan_common.AsOfJoinDesc asof_desc = 10;
}

message TemporalJoinNode {
Expand Down
30 changes: 23 additions & 7 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ impl ScaleController {

let upstream_fragment = ctx.fragment_map.get(upstream_fragment_id).unwrap();

// build actor group map
for upstream_actor in &upstream_fragment.actors {
for dispatcher in &upstream_actor.dispatcher {
if let DispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
Expand Down Expand Up @@ -1099,6 +1100,14 @@ impl ScaleController {
.cloned()
.unwrap_or_default();

// Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
if upstream_fragment.distribution_type() == FragmentDistributionType::Single {
assert!(
upstream_fragment_bitmap.is_empty(),
"single fragment should have no bitmap updates"
);
}

let upstream_fragment_actor_map = fragment_actors_after_reschedule
.get(upstream_fragment_id)
.cloned()
Expand Down Expand Up @@ -1159,19 +1168,26 @@ impl ScaleController {
.get(&worker_id)
.unwrap()
.clone();

assert_eq!(actor_ids.len(), upstream_actor_ids.len());

for (actor_id, upstream_actor_id) in actor_ids
.into_iter()
.zip_eq_debug(upstream_actor_ids.into_iter())
{
let bitmap = upstream_fragment_bitmap
.get(&upstream_actor_id)
.cloned()
.unwrap();

// Copy the bitmap
fragment_bitmap.insert(actor_id, bitmap);
match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
None => {
// single fragment should have no bitmap updates (same as upstream)
assert_eq!(
upstream_fragment.distribution_type(),
FragmentDistributionType::Single
);
}
Some(bitmap) => {
// Copy the bitmap
fragment_bitmap.insert(actor_id, bitmap);
}
}

no_shuffle_upstream_actor_map
.entry(actor_id as ActorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange,
GroupTableChange, PbLevelType,
CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType,
};
use tracing::warn;

Expand All @@ -49,8 +48,6 @@ pub struct GroupDeltasSummary {
pub insert_table_infos: Vec<SstableInfo>,
pub group_construct: Option<GroupConstruct>,
pub group_destroy: Option<CompactionGroupId>,
pub group_meta_changes: Vec<GroupMetaChange>,
pub group_table_change: Option<GroupTableChange>,
pub new_vnode_partition_count: u32,
pub group_merge: Option<GroupMerge>,
}
Expand All @@ -66,8 +63,6 @@ pub fn summarize_group_deltas(
let mut insert_table_infos = vec![];
let mut group_construct = None;
let mut group_destroy = None;
let mut group_meta_changes = vec![];
let mut group_table_change = None;
let mut new_vnode_partition_count = 0;
let mut group_merge = None;

Expand All @@ -93,12 +88,6 @@ pub fn summarize_group_deltas(
assert!(group_destroy.is_none());
group_destroy = Some(compaction_group_id);
}
GroupDelta::GroupMetaChange(meta_delta) => {
group_meta_changes.push(meta_delta.clone());
}
GroupDelta::GroupTableChange(meta_delta) => {
group_table_change = Some(meta_delta.clone());
}
GroupDelta::GroupMerge(merge_delta) => {
assert!(group_merge.is_none());
group_merge = Some(*merge_delta);
Expand All @@ -118,8 +107,6 @@ pub fn summarize_group_deltas(
insert_table_infos,
group_construct,
group_destroy,
group_meta_changes,
group_table_change,
new_vnode_partition_count,
group_merge,
}
Expand Down Expand Up @@ -404,8 +391,7 @@ impl HummockVersion {
}
}
return;
}
if !self.levels.contains_key(&parent_group_id) {
} else if !self.levels.contains_key(&parent_group_id) {
warn!(parent_group_id, "non-existing parent group id to init from");
return;
}
Expand Down Expand Up @@ -617,38 +603,6 @@ impl HummockVersion {
member_table_ids,
group_construct.get_new_sst_start_id(),
);
} else if let Some(group_change) = &summary.group_table_change {
// TODO: may deprecate this branch? This enum variant is not created anywhere
assert!(
group_change.version <= CompatibilityVersion::NoTrivialSplit as _,
"DeltaType::GroupTableChange is not used anymore after CompatibilityVersion::NoMemberTableIds is added"
);
#[expect(deprecated)]
// for backward-compatibility of previous hummock version delta
self.init_with_parent_group(
group_change.origin_group_id,
group_change.target_group_id,
BTreeSet::from_iter(group_change.table_ids.clone()),
group_change.new_sst_start_id,
);

let levels = self
.levels
.get_mut(&group_change.origin_group_id)
.expect("compaction group should exist");
#[expect(deprecated)]
// for backward-compatibility of previous hummock version delta
let mut moving_tables = levels
.member_table_ids
.extract_if(|t| group_change.table_ids.contains(t))
.collect_vec();
#[expect(deprecated)]
// for backward-compatibility of previous hummock version delta
self.levels
.get_mut(compaction_group_id)
.expect("compaction group should exist")
.member_table_ids
.append(&mut moving_tables);
} else if let Some(group_merge) = &summary.group_merge {
tracing::info!(
"group_merge left {:?} right {:?}",
Expand All @@ -662,16 +616,6 @@ impl HummockVersion {
let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
panic!("compaction group {} does not exist", compaction_group_id)
});
#[expect(deprecated)] // for backward-compatibility of previous hummock version delta
for group_meta_delta in &summary.group_meta_changes {
levels
.member_table_ids
.extend(group_meta_delta.table_ids_add.clone());
levels
.member_table_ids
.retain(|t| !group_meta_delta.table_ids_remove.contains(t));
levels.member_table_ids.sort();
}

assert!(
visible_table_committed_epoch <= version_delta.visible_table_committed_epoch(),
Expand Down
33 changes: 2 additions & 31 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use risingwave_pb::hummock::group_delta::PbDeltaType;
use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
use risingwave_pb::hummock::{
CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge,
PbGroupMetaChange, PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta,
PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, StateTableInfo, StateTableInfoDelta,
PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo,
StateTableInfo, StateTableInfoDelta,
};
use tracing::warn;

Expand Down Expand Up @@ -924,11 +924,6 @@ pub enum GroupDeltaCommon<T> {
IntraLevel(IntraLevelDeltaCommon<T>),
GroupConstruct(PbGroupConstruct),
GroupDestroy(PbGroupDestroy),
GroupMetaChange(PbGroupMetaChange),

#[allow(dead_code)]
GroupTableChange(PbGroupTableChange),

GroupMerge(PbGroupMerge),
}

Expand All @@ -949,12 +944,6 @@ where
Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
GroupDeltaCommon::GroupDestroy(pb_group_destroy)
}
Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => {
GroupDeltaCommon::GroupMetaChange(pb_group_meta_change)
}
Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => {
GroupDeltaCommon::GroupTableChange(pb_group_table_change)
}
Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
GroupDeltaCommon::GroupMerge(pb_group_merge)
}
Expand All @@ -978,12 +967,6 @@ where
GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)),
},
GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)),
},
GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change)),
},
GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)),
},
Expand All @@ -1006,12 +989,6 @@ where
GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
},
GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change.clone())),
},
GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change.clone())),
},
GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta {
delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)),
},
Expand All @@ -1034,12 +1011,6 @@ where
Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
GroupDeltaCommon::GroupDestroy(*pb_group_destroy)
}
Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => {
GroupDeltaCommon::GroupMetaChange(pb_group_meta_change.clone())
}
Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => {
GroupDeltaCommon::GroupTableChange(pb_group_table_change.clone())
}
Some(PbDeltaType::GroupMerge(pb_group_merge)) => {
GroupDeltaCommon::GroupMerge(*pb_group_merge)
}
Expand Down
Loading

0 comments on commit 1c044be

Please sign in to comment.