From 0676c0c4c01f72c1567cb6e45a050485cb9dc8db Mon Sep 17 00:00:00 2001 From: August Date: Wed, 7 Feb 2024 01:03:33 +0800 Subject: [PATCH] change stream node db type from json to binary --- Cargo.lock | 1 + src/meta/model_v2/Cargo.toml | 1 + src/meta/model_v2/src/fragment.rs | 33 +++++++++++++++++++++++- src/meta/model_v2/src/lib.rs | 2 -- src/meta/src/controller/fragment.rs | 22 ++++++++-------- src/meta/src/controller/streaming_job.rs | 30 ++++++++++++++------- src/meta/src/controller/utils.rs | 6 ++--- 7 files changed, 68 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 134b1c9ae9f82..1805bf412044f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9549,6 +9549,7 @@ dependencies = [ name = "risingwave_meta_model_v2" version = "1.7.0-alpha" dependencies = [ + "prost 0.12.1", "risingwave_common", "risingwave_hummock_sdk", "risingwave_pb", diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml index 33a4b2a38e98c..9d75bf22b5c6e 100644 --- a/src/meta/model_v2/Cargo.toml +++ b/src/meta/model_v2/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +prost = { workspace = true } risingwave_common = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index af1d529a05980..27a5898064595 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Formatter; + use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; +use risingwave_pb::stream_plan::PbStreamNode; use sea_orm::entity::prelude::*; -use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode}; +use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] @@ -31,6 +34,34 @@ pub struct Model { pub upstream_fragment_id: I32Array, } +/// This is a workaround to avoid stack overflow when deserializing the `StreamNode` field from sql +/// backend if we store it as Json. We'd better fix it before using it in production, because it's less +/// readable and maintainable. +#[derive(Clone, PartialEq, Eq, DeriveValueType)] +pub struct StreamNode(#[sea_orm] Vec); + +impl StreamNode { + pub fn to_protobuf(&self) -> PbStreamNode { + prost::Message::decode(self.0.as_slice()).unwrap() + } + + pub fn from_protobuf(val: &PbStreamNode) -> Self { + Self(prost::Message::encode_to_vec(val)) + } +} + +impl std::fmt::Debug for StreamNode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } +} + +impl Default for StreamNode { + fn default() -> Self { + Self::from_protobuf(&PbStreamNode::default()) + } +} + #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DistributionType { diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 8c85cad2a6597..2abd66fae3fe3 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -216,8 +216,6 @@ derive_from_json_struct!( ); derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo); -derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode); - derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer); derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4f7fb469aec97..833e642a83e74 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -20,11 +20,12 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, + StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -281,7 +282,7 @@ impl CatalogController { let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(); - let stream_node = StreamNode(stream_node); + let stream_node = StreamNode::from_protobuf(&stream_node); let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type) .unwrap() @@ -367,7 +368,7 @@ impl CatalogController { upstream_fragment_id, } = fragment; - let stream_node_template = stream_node.into_inner(); + let stream_node_template = stream_node.to_protobuf(); let mut pb_actors = vec![]; @@ -1186,7 +1187,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new) @@ -1216,7 +1217,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new) @@ -1278,11 +1279,10 @@ mod tests { use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; - use risingwave_meta_model_v2::fragment::DistributionType; + use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, - ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, - VnodeBitmap, + ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, TableId, VnodeBitmap, }; use risingwave_pb::common::ParallelUnit; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; @@ -1448,13 +1448,13 @@ mod tests { actors: Vec, upstream_actor_ids: &HashMap>>, ) { - let stream_node_template = fragment.stream_node.clone(); + let stream_node_template = fragment.stream_node.to_protobuf(); for PbStreamActor { nodes, actor_id, .. } in actors { - let mut template_node = stream_node_template.clone().into_inner(); + let mut template_node = stream_node_template.clone(); let nodes = nodes.unwrap(); let actor_upstream_actor_ids = upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap(); @@ -1553,7 +1553,7 @@ mod tests { job_id: TEST_JOB_ID, fragment_type_mask: 0, distribution_type: DistributionType::Hash, - stream_node: StreamNode(stream_node), + stream_node: StreamNode::from_protobuf(&stream_node), vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()), state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]), upstream_fragment_id: I32Array::default(), diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 01c07c82878f6..8ed85cc5147c8 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -22,6 +22,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; +use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source, @@ -30,8 +31,8 @@ use risingwave_meta_model_v2::prelude::{ use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, - StreamingParallelism, TableId, TableVersion, UserId, + FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, + TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -648,8 +649,9 @@ impl CatalogController { .into_tuple::<(FragmentId, StreamNode, I32Array)>() .one(&txn) .await? + .map(|(id, node, upstream)| (id, node.to_protobuf(), upstream)) .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; - visit_stream_node(&mut stream_node.0, |body| { + visit_stream_node(&mut stream_node, |body| { if let PbNodeBody::Merge(m) = body && let Some((new_fragment_id, new_actor_ids)) = fragment_replace_map.get(&m.upstream_fragment_id) @@ -665,7 +667,7 @@ impl CatalogController { } fragment::ActiveModel { fragment_id: Set(fragment_id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), upstream_fragment_id: Set(upstream_fragment_id), ..Default::default() } @@ -789,7 +791,7 @@ impl CatalogController { ))); } - let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, @@ -800,11 +802,15 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + let mut fragments = fragments + .into_iter() + .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) + .collect_vec(); fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { - visit_stream_node(&mut stream_node.0, |node| { + visit_stream_node(stream_node, |node| { if let PbNodeBody::Source(node) = node { if let Some(node_inner) = &mut node.source_inner && node_inner.source_id == source_id as u32 @@ -826,7 +832,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), ..Default::default() } .update(&txn) @@ -849,7 +855,7 @@ impl CatalogController { let inner = self.inner.read().await; let txn = inner.db.begin().await?; - let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, @@ -860,11 +866,15 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + let mut fragments = fragments + .into_iter() + .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) + .collect_vec(); fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 { - visit_stream_node(&mut stream_node.0, |node| { + visit_stream_node(stream_node, |node| { if let PbNodeBody::StreamScan(node) = node { node.rate_limit = rate_limit; found = true; @@ -883,7 +893,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), ..Default::default() } .update(&txn) diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 53ac3c9616e28..ff19892d516b5 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -18,14 +18,14 @@ use anyhow::anyhow; use itertools::Itertools; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::DistributionType; +use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, - SchemaId, SourceId, StreamNode, UserId, + SchemaId, SourceId, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; use risingwave_pb::meta::PbFragmentParallelUnitMapping; @@ -761,7 +761,7 @@ where let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new)