diff --git a/proto/meta.proto b/proto/meta.proto index 6d8e07279d0d8..ee00fe28668c6 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -90,7 +90,11 @@ message TableFragments { // so we pre-generate and store it here, this member will only be initialized when creating the Fragment // and modified when creating the mv-on-mv repeated uint32 upstream_fragment_ids = 7; + + // A template used to store each StreamNode within an Actor for TableFragments that have compression enabled. + stream_plan.StreamNode stream_node_template = 8; } + uint32 table_id = 1; State state = 2; map fragments = 3; @@ -98,6 +102,17 @@ message TableFragments { map actor_splits = 5; stream_plan.StreamEnvironment env = 6; + + enum GraphRenderType { + RENDER_UNSPECIFIED = 0; + RENDER_TEMPLATE = 1; + } + + // When we use RENDER_TEMPLATE, template compression is enabled during the conversion between PbTableFragments and model::TableFragments. + // This avoids pointless repetition in each PbActor’s StreamNode within PbFragment. + // When RENDER_TEMPLATE is activated, the stream_node_template in PbFragment should not be empty. + // Also, the nodes in the original actors field will be disregarded and rendered when converted to model::TableFragments. + GraphRenderType graph_render_type = 7; } /// Parallel unit mapping with fragment id, used for notification. diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fd7988413aeed..ba551e97e2cc0 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -810,6 +810,14 @@ message StreamActor { common.Buffer vnode_bitmap = 8; // The SQL definition of this materialized view. Used for debugging only. string mview_definition = 9; + + message ActorIds { + repeated uint32 actor_ids = 1; + } + + // The upstream actors of this actor, grouped by fragment id. + // Content will only be present when using RENDER_TEMPLATE. + map upstream_actors_by_fragment = 10; } enum FragmentTypeFlag { diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 2f2b5cb1f0cd2..4213bfb512fba 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -45,3 +45,4 @@ pub use future_utils::{ }; #[macro_use] pub mod match_util; +pub mod table_fragments_util; diff --git a/src/common/src/util/table_fragments_util.rs b/src/common/src/util/table_fragments_util.rs new file mode 100644 index 0000000000000..0c9003b8f5c47 --- /dev/null +++ b/src/common/src/util/table_fragments_util.rs @@ -0,0 +1,155 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use risingwave_pb::meta::table_fragments::GraphRenderType; +use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::stream_plan::stream_actor::PbActorIds; +use risingwave_pb::stream_plan::stream_node::NodeBody; + +use crate::util::stream_graph_visitor::visit_stream_node; + +#[easy_ext::ext(TableFragmentsRenderCompression)] +impl PbTableFragments { + // We decompress `PbTableFragments` using the `stream_node_template` of the `PbFragment`, + // by re-rendering the template, then repopulating the upstream actor ids and upstream fragment id + // contained in the `MergeNode` by `upstream_actors_by_fragment` in actor. + pub fn uncompress(&mut self) { + assert_eq!( + self.graph_render_type, + GraphRenderType::RenderTemplate as i32 + ); + + for fragment in self.fragments.values_mut() { + assert!( + fragment.stream_node_template.is_some(), + "fragment.stream_node_template should not be None when render type is RenderTemplate" + ); + + for actor in &mut fragment.actors { + let pb_nodes = { + let mut nodes = fragment.stream_node_template.clone().unwrap(); + + visit_stream_node(&mut nodes, |body| { + if let NodeBody::Merge(m) = body + && let Some(PbActorIds{ actor_ids: upstream_actor_ids }) = actor.upstream_actors_by_fragment.get(&(m.upstream_fragment_id as _)) + { + m.upstream_actor_id = upstream_actor_ids.iter().map(|id| *id as _).collect(); + } + }); + + Some(nodes) + }; + + assert!( + actor.nodes.is_none(), + "actor.nodes should be None when render type is RenderTemplate" + ); + + actor.nodes = pb_nodes; + actor.upstream_actors_by_fragment.clear(); + } + + assert!( + fragment.stream_node_template.is_some(), + "fragment.stream_node_template should not be None when render type is RenderTemplate" + ); + fragment.stream_node_template = None; + } + + self.graph_render_type = GraphRenderType::RenderUnspecified as i32; + } + + // We compressed the repetitive `node_body` of `PbStreamNode` in actors in Fragment in TableFragments, + // storing their templates in the `stream_node_template` in the `PbFragment`. + // We have also made special modifications to `MergeNode`s, extracting their upstream fragment id and upstream actor ids. + // This way, we can save some memory when conducting RPC and storage. + pub fn compress(&mut self) { + assert_eq!( + self.graph_render_type, + GraphRenderType::RenderUnspecified as i32 + ); + + for fragment in self.fragments.values_mut() { + assert!( + fragment.stream_node_template.is_none(), + "fragment.stream_node_template should be None when render type is unspecific" + ); + + let stream_node = { + let actor_template = fragment + .actors + .first() + .cloned() + .expect("fragment has no actor"); + + let mut stream_node = actor_template.nodes.unwrap(); + visit_stream_node(&mut stream_node, |body| { + if let NodeBody::Merge(m) = body { + m.upstream_actor_id = vec![]; + } + }); + + stream_node + }; + + for actor in &mut fragment.actors { + let Some(node) = actor.nodes.as_mut() else { + continue; + }; + + let mut upstream_actors_by_fragment = HashMap::new(); + visit_stream_node(node, |body| { + if let NodeBody::Merge(m) = body { + let upstream_actor_ids = std::mem::take(&mut m.upstream_actor_id); + assert!( + upstream_actors_by_fragment + .insert( + m.upstream_fragment_id, + PbActorIds { + actor_ids: upstream_actor_ids + } + ) + .is_none(), + "There should only be one link between two fragments" + ); + } + }); + + assert!( + actor.nodes.is_some(), + "actor.nodes should not be None when render type is unspecified" + ); + + actor.nodes = None; + + assert!( + actor.upstream_actors_by_fragment.is_empty(), + "actor.upstream_actors_by_fragment should be empty when render type is unspecified" + ); + + actor.upstream_actors_by_fragment = upstream_actors_by_fragment; + } + + assert!( + fragment.stream_node_template.is_none(), + "fragment.stream_node_template should be None when render type is unspecified" + ); + fragment.stream_node_template = Some(stream_node); + } + + self.graph_render_type = GraphRenderType::RenderTemplate as _; + } +} diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 9fe0910bee9c7..499d96e03cd15 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -355,6 +355,7 @@ impl CatalogController { _table_fragments: &PbTableFragments, _internal_tables: Vec, ) -> MetaResult<()> { + // note: If we use PbTableFragments here, we need to determine whether decompression is needed through the ‘graph_render_type’. todo!() } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index dc36c6466e57c..042106e77dc61 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,6 +19,7 @@ use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; +use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; use risingwave_meta_model_v2::{ @@ -29,7 +30,7 @@ use risingwave_pb::common::PbParallelUnit; use risingwave_pb::ddl_service::PbTableJobType; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; -use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; +use risingwave_pb::meta::table_fragments::{GraphRenderType, PbActorStatus, PbFragment, PbState}; use risingwave_pb::meta::{FragmentParallelUnitMapping, PbTableFragments}; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -72,13 +73,7 @@ impl CatalogControllerInner { impl CatalogController { #[allow(clippy::type_complexity)] pub fn extract_fragment_and_actors_from_table_fragments( - PbTableFragments { - table_id, - fragments, - actor_status, - actor_splits, - .. - }: PbTableFragments, + table_fragments: PbTableFragments, ) -> MetaResult< Vec<( fragment::Model, @@ -86,6 +81,20 @@ impl CatalogController { HashMap>, )>, > { + let mut table_fragments = table_fragments; + + if table_fragments.graph_render_type == GraphRenderType::RenderTemplate as i32 { + table_fragments.uncompress() + } + + let PbTableFragments { + table_id, + fragments, + actor_status, + actor_splits, + .. + } = table_fragments; + let mut result = vec![]; let fragments: BTreeMap<_, _> = fragments.into_iter().collect(); @@ -123,6 +132,7 @@ impl CatalogController { vnode_mapping: pb_vnode_mapping, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + .. } = pb_fragment; let state_table_ids = pb_state_table_ids.into(); @@ -170,6 +180,7 @@ impl CatalogController { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition: _, + upstream_actors_by_fragment: _upstream_actors_by_fragment, } = actor; let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits); @@ -275,6 +286,7 @@ impl CatalogController { actor_status: pb_actor_status, actor_splits: pb_actor_splits, env, + graph_render_type: GraphRenderType::RenderUnspecified as _, }; Ok(table_fragments) @@ -388,6 +400,7 @@ impl CatalogController { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition: "".to_string(), + upstream_actors_by_fragment: Default::default(), }) } @@ -403,6 +416,7 @@ impl CatalogController { vnode_mapping: Some(pb_vnode_mapping), state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + stream_node_template: Default::default(), }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) @@ -1047,6 +1061,7 @@ mod tests { .cloned() .map(|bitmap| bitmap.to_protobuf()), mview_definition: "".to_string(), + upstream_actors_by_fragment: Default::default(), } }) .collect_vec(); @@ -1062,6 +1077,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), + stream_node_template: None, }; let pb_actor_status = (0..actor_count) @@ -1261,6 +1277,7 @@ mod tests { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition, + upstream_actors_by_fragment: _, }, ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter()) { @@ -1328,6 +1345,7 @@ mod tests { vnode_mapping: pb_vnode_mapping, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + stream_node_template: _, } = pb_fragment; assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32); diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index f4fa21437fc1b..89037105fa6da 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -57,10 +57,12 @@ pub(super) mod handlers { use axum::Json; use itertools::Itertools; use risingwave_common::bail; + use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression; use risingwave_common_heap_profiling::COLLAPSED_SUFFIX; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{Sink, Source, Table, View}; use risingwave_pb::common::{WorkerNode, WorkerType}; + use risingwave_pb::meta::table_fragments::GraphRenderType; use risingwave_pb::meta::{ActorLocation, PbTableFragments}; use risingwave_pb::monitor_service::{ HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse, @@ -193,13 +195,22 @@ pub(super) mod handlers { ) -> Result>> { use crate::model::MetadataModel; - let table_fragments = TableFragments::list(&srv.meta_store) + let mut all_table_fragments = TableFragments::list(&srv.meta_store) .await .map_err(err)? .into_iter() .map(|x| x.to_protobuf()) .collect_vec(); - Ok(Json(table_fragments)) + + for table_fragments in &mut all_table_fragments { + if table_fragments.get_graph_render_type().unwrap_or_default() + == GraphRenderType::RenderTemplate + { + table_fragments.uncompress(); + } + } + + Ok(Json(all_table_fragments)) } async fn dump_await_tree_inner( diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 5f9b303c77a5c..921da7bd3390b 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -18,10 +18,11 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::ParallelUnitId; +use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression; use risingwave_connector::source::SplitImpl; use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; -use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; +use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, GraphRenderType, State}; use risingwave_pb::meta::PbTableFragments; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -94,18 +95,33 @@ impl MetadataModel for TableFragments { } fn to_protobuf(&self) -> Self::PbType { - Self::PbType { + let mut table_fragments = PbTableFragments { table_id: self.table_id.table_id(), state: self.state as _, fragments: self.fragments.clone().into_iter().collect(), actor_status: self.actor_status.clone().into_iter().collect(), actor_splits: build_actor_connector_splits(&self.actor_splits), env: Some(self.env.to_protobuf()), - } + ..Default::default() + }; + + table_fragments.compress(); + + table_fragments } fn from_protobuf(prost: Self::PbType) -> Self { + let mut prost = prost; + let env = StreamEnvironment::from_protobuf(prost.get_env().unwrap()); + + if let GraphRenderType::RenderTemplate = prost + .get_graph_render_type() + .unwrap_or(GraphRenderType::RenderUnspecified) + { + prost.uncompress(); + } + Self { table_id: TableId::new(prost.table_id), state: prost.state(), diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 4a5339431f0aa..5cc44f89fee31 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -281,6 +281,7 @@ impl ActorBuilder { upstream_actor_id, vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()), mview_definition, + upstream_actors_by_fragment: Default::default(), }) } } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index a47380e21950f..8a52ef52b0a80 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -836,6 +836,7 @@ impl CompleteStreamFragmentGraph { vnode_mapping: Some(distribution.into_mapping().to_protobuf()), state_table_ids, upstream_fragment_ids, + stream_node_template: None, } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8163b19848eff..436aeeb147ff1 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -34,6 +34,7 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; +use risingwave_common::util::table_fragments_util::TableFragmentsRenderCompression; use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::{ @@ -77,6 +78,7 @@ use risingwave_pb::meta::scale_service_client::ScaleServiceClient; use risingwave_pb::meta::serving_service_client::ServingServiceClient; use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient; use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; +use risingwave_pb::meta::table_fragments::GraphRenderType; use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use risingwave_pb::meta::*; @@ -844,7 +846,14 @@ impl MetaClient { pub async fn get_cluster_info(&self) -> Result { let request = GetClusterInfoRequest {}; - let resp = self.inner.get_cluster_info(request).await?; + let mut resp = self.inner.get_cluster_info(request).await?; + + for table_fragments in &mut resp.table_fragments { + if table_fragments.graph_render_type == GraphRenderType::RenderTemplate as i32 { + table_fragments.uncompress() + } + } + Ok(resp) }