Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add compression capabilities for StreamNode in PbTableFragments. #13598

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,29 @@ message TableFragments {
// so we pre-generate and store it here, this member will only be initialized when creating the Fragment
// and modified when creating the mv-on-mv
repeated uint32 upstream_fragment_ids = 7;

// A template used to store each StreamNode within an Actor for TableFragments that have compression enabled.
stream_plan.StreamNode stream_node_template = 8;
}

uint32 table_id = 1;
State state = 2;
map<uint32, Fragment> fragments = 3;
map<uint32, ActorStatus> actor_status = 4;
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamEnvironment env = 6;

enum GraphRenderType {
RENDER_UNSPECIFIED = 0;
RENDER_TEMPLATE = 1;
}

// When we use RENDER_TEMPLATE, template compression is enabled during the conversion between PbTableFragments and model::TableFragments.
// This avoids pointless repetition in each PbActor’s StreamNode within PbFragment.
// When RENDER_TEMPLATE is activated, the stream_node_template in PbFragment should not be empty.
// Also, the nodes in the original actors field will be disregarded and rendered when converted to model::TableFragments.
GraphRenderType graph_render_type = 7;
}

/// Parallel unit mapping with fragment id, used for notification.
Expand Down
8 changes: 8 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,14 @@ message StreamActor {
common.Buffer vnode_bitmap = 8;
// The SQL definition of this materialized view. Used for debugging only.
string mview_definition = 9;

message ActorIds {
repeated uint32 actor_ids = 1;
}

// The upstream actors of this actor, grouped by fragment id.
// Content will only be present when using RENDER_TEMPLATE.
map<uint32, ActorIds> upstream_actors_by_fragment = 10;
}

enum FragmentTypeFlag {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ pub use future_utils::{
};
#[macro_use]
pub mod match_util;
pub mod table_fragments_util;
155 changes: 155 additions & 0 deletions src/common/src/util/table_fragments_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_pb::meta::table_fragments::GraphRenderType;
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::stream_plan::stream_actor::PbActorIds;
use risingwave_pb::stream_plan::stream_node::NodeBody;

use crate::util::stream_graph_visitor::visit_stream_node;

#[easy_ext::ext(TableFragmentsRenderCompression)]
impl PbTableFragments {
// We decompress `PbTableFragments` using the `stream_node_template` of the `PbFragment`,
// by re-rendering the template, then repopulating the upstream actor ids and upstream fragment id
// contained in the `MergeNode` by `upstream_actors_by_fragment` in actor.
pub fn ensure_uncompressed(&mut self) {
assert_eq!(
self.graph_render_type,
GraphRenderType::RenderTemplate as i32
);

for fragment in self.fragments.values_mut() {
assert!(
fragment.stream_node_template.is_some(),
"fragment.stream_node_template should not be None when render type is RenderTemplate"
);

for actor in &mut fragment.actors {
let pb_nodes = {
let mut nodes = fragment.stream_node_template.clone().unwrap();

visit_stream_node(&mut nodes, |body| {
if let NodeBody::Merge(m) = body
&& let Some(PbActorIds{ actor_ids: upstream_actor_ids }) = actor.upstream_actors_by_fragment.get(&(m.upstream_fragment_id as _))
{
m.upstream_actor_id = upstream_actor_ids.iter().map(|id| *id as _).collect();
}
});

Some(nodes)
};

assert!(
actor.nodes.is_none(),
"actor.nodes should be None when render type is RenderTemplate"
);

actor.nodes = pb_nodes;
actor.upstream_actors_by_fragment.clear();
}

assert!(
fragment.stream_node_template.is_some(),
"fragment.stream_node_template should not be None when render type is RenderTemplate"
);
fragment.stream_node_template = None;
}

self.graph_render_type = GraphRenderType::RenderUnspecified as i32;
}

// We compressed the repetitive `node_body` of `PbStreamNode` in actors in Fragment in TableFragments,
// storing their templates in the `stream_node_template` in the `PbFragment`.
// We have also made special modifications to `MergeNode`s, extracting their upstream fragment id and upstream actor ids.
// This way, we can save some memory when conducting RPC and storage.
pub fn ensure_compressed(&mut self) {
assert_eq!(
self.graph_render_type,
GraphRenderType::RenderUnspecified as i32
);

for fragment in self.fragments.values_mut() {
assert!(
fragment.stream_node_template.is_none(),
"fragment.stream_node_template should be None when render type is unspecific"
);

let stream_node = {
let actor_template = fragment
.actors
.first()
.cloned()
.expect("fragment has no actor");

let mut stream_node = actor_template.nodes.unwrap();
visit_stream_node(&mut stream_node, |body| {
if let NodeBody::Merge(m) = body {
m.upstream_actor_id = vec![];
}
});

stream_node
};

for actor in &mut fragment.actors {
let Some(node) = actor.nodes.as_mut() else {
continue;
};

let mut upstream_actors_by_fragment = HashMap::new();
visit_stream_node(node, |body| {
if let NodeBody::Merge(m) = body {
let upstream_actor_ids = std::mem::take(&mut m.upstream_actor_id);
assert!(
upstream_actors_by_fragment
.insert(
m.upstream_fragment_id,
PbActorIds {
actor_ids: upstream_actor_ids
}
)
.is_none(),
"There should only be one link between two fragments"
);
}
});

assert!(
actor.nodes.is_some(),
"actor.nodes should not be None when render type is unspecified"
);

actor.nodes = None;

assert!(
actor.upstream_actors_by_fragment.is_empty(),
"actor.upstream_actors_by_fragment should be empty when render type is unspecified"
);

actor.upstream_actors_by_fragment = upstream_actors_by_fragment;
}

assert!(
fragment.stream_node_template.is_none(),
"fragment.stream_node_template should be None when render type is unspecified"
);
fragment.stream_node_template = Some(stream_node);
}

self.graph_render_type = GraphRenderType::RenderTemplate as _;
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ impl CatalogController {
_table_fragments: &PbTableFragments,
_internal_tables: Vec<PbTable>,
) -> MetaResult<()> {
// note: If we use PbTableFragments here, we need to determine whether decompression is needed through the ‘graph_render_type’.
todo!()
}

Expand Down
34 changes: 26 additions & 8 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob};
use risingwave_meta_model_v2::{
Expand All @@ -29,7 +30,7 @@ use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState};
use risingwave_pb::meta::table_fragments::{GraphRenderType, PbActorStatus, PbFragment, PbState};
use risingwave_pb::meta::{FragmentParallelUnitMapping, PbTableFragments};
use risingwave_pb::source::PbConnectorSplits;
use risingwave_pb::stream_plan::stream_node::NodeBody;
Expand Down Expand Up @@ -72,20 +73,28 @@ impl CatalogControllerInner {
impl CatalogController {
#[allow(clippy::type_complexity)]
pub fn extract_fragment_and_actors_from_table_fragments(
PbTableFragments {
table_id,
fragments,
actor_status,
actor_splits,
..
}: PbTableFragments,
table_fragments: PbTableFragments,
) -> MetaResult<
Vec<(
fragment::Model,
Vec<actor::Model>,
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
> {
let mut table_fragments = table_fragments;

if table_fragments.graph_render_type == GraphRenderType::RenderTemplate as i32 {
table_fragments.ensure_uncompressed()
}

let PbTableFragments {
table_id,
fragments,
actor_status,
actor_splits,
..
} = table_fragments;

let mut result = vec![];

let fragments: BTreeMap<_, _> = fragments.into_iter().collect();
Expand Down Expand Up @@ -123,6 +132,7 @@ impl CatalogController {
vnode_mapping: pb_vnode_mapping,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
..
} = pb_fragment;

let state_table_ids = pb_state_table_ids.into();
Expand Down Expand Up @@ -170,6 +180,7 @@ impl CatalogController {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition: _,
upstream_actors_by_fragment: _upstream_actors_by_fragment,
} = actor;

let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits);
Expand Down Expand Up @@ -275,6 +286,7 @@ impl CatalogController {
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
env,
graph_render_type: GraphRenderType::RenderUnspecified as _,
};

Ok(table_fragments)
Expand Down Expand Up @@ -388,6 +400,7 @@ impl CatalogController {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition: "".to_string(),
upstream_actors_by_fragment: Default::default(),
})
}

Expand All @@ -403,6 +416,7 @@ impl CatalogController {
vnode_mapping: Some(pb_vnode_mapping),
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
stream_node_template: Default::default(),
};

Ok((pb_fragment, pb_actor_status, pb_actor_splits))
Expand Down Expand Up @@ -1047,6 +1061,7 @@ mod tests {
.cloned()
.map(|bitmap| bitmap.to_protobuf()),
mview_definition: "".to_string(),
upstream_actors_by_fragment: Default::default(),
}
})
.collect_vec();
Expand All @@ -1062,6 +1077,7 @@ mod tests {
.values()
.flat_map(|m| m.keys().map(|x| *x as _))
.collect(),
stream_node_template: None,
};

let pb_actor_status = (0..actor_count)
Expand Down Expand Up @@ -1261,6 +1277,7 @@ mod tests {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition,
upstream_actors_by_fragment: _,
},
) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
{
Expand Down Expand Up @@ -1328,6 +1345,7 @@ mod tests {
vnode_mapping: pb_vnode_mapping,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
stream_node_template: _,
} = pb_fragment;

assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ pub(super) mod handlers {
use axum::Json;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression;
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{Sink, Source, Table, View};
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::table_fragments::GraphRenderType;
use risingwave_pb::meta::{ActorLocation, PbTableFragments};
use risingwave_pb::monitor_service::{
HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
Expand Down Expand Up @@ -193,13 +195,22 @@ pub(super) mod handlers {
) -> Result<Json<Vec<PbTableFragments>>> {
use crate::model::MetadataModel;

let table_fragments = TableFragments::list(&srv.meta_store)
let mut all_table_fragments = TableFragments::list(&srv.meta_store)
.await
.map_err(err)?
.into_iter()
.map(|x| x.to_protobuf())
.collect_vec();
Ok(Json(table_fragments))

for table_fragments in &mut all_table_fragments {
if table_fragments.get_graph_render_type().unwrap_or_default()
== GraphRenderType::RenderTemplate
{
table_fragments.ensure_uncompressed();
}
}

Ok(Json(all_table_fragments))
}

async fn dump_await_tree_inner(
Expand Down
Loading
Loading