From 05b84b449c8c129f3ed0662de15ae1f80abb37e1 Mon Sep 17 00:00:00 2001 From: August Date: Wed, 7 Feb 2024 16:55:52 +0800 Subject: [PATCH] fix(sql-backend): change stream node type from json to binary to fix stack overflow (#15040) Signed-off-by: dependabot[bot] Signed-off-by: Bugen Zhao Co-authored-by: yezizp2012 Co-authored-by: Li0k Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] Co-authored-by: Richard Chien Co-authored-by: Bugen Zhao Co-authored-by: Dylan Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> --- Cargo.lock | 37 ++++++----- src/meta/Cargo.toml | 6 +- src/meta/model_v2/Cargo.toml | 7 +- src/meta/model_v2/migration/Cargo.toml | 2 +- .../migration/src/m20230908_072257_init.rs | 2 +- src/meta/model_v2/src/fragment.rs | 33 +++++++++- src/meta/model_v2/src/lib.rs | 2 - src/meta/model_v2/src/prelude.rs | 1 + src/meta/node/Cargo.toml | 2 +- src/meta/service/Cargo.toml | 2 +- src/meta/src/controller/catalog.rs | 14 ++-- src/meta/src/controller/fragment.rs | 22 +++---- src/meta/src/controller/streaming_job.rs | 66 +++++++++++-------- src/meta/src/controller/system_param.rs | 5 +- src/meta/src/controller/utils.rs | 6 +- src/meta/src/hummock/manager/sequence.rs | 3 +- src/meta/src/manager/notification_version.rs | 6 +- 17 files changed, 129 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c5f82c7fc8074..ed7677d652954 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9542,6 +9542,7 @@ dependencies = [ name = "risingwave_meta_model_v2" version = "1.7.0-alpha" dependencies = [ + "prost 0.12.1", "risingwave_common", "risingwave_hummock_sdk", "risingwave_pb", @@ -10450,9 +10451,9 @@ dependencies = [ [[package]] name = "sea-orm" -version = "0.12.2" +version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61f6c7daef05dde3476d97001e11fca7a52b655aa3bf4fd610ab2da1176a2ed5" +checksum = "6632f499b80cc6aaa781b302e4c9fae663e0e3dcf2640e9d80034d5b10731efe" dependencies = [ "async-stream", "async-trait", @@ -10478,9 +10479,9 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "0.12.2" +version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e3f0ff2fa5672e2e7314d107c6498a18e469beeb340a0ed84e3075fce73c2cd" +checksum = "465ea2308d4716837e9af4a2cff8e14c28135867a580bb93e9e03d408a3a6afb" dependencies = [ "chrono", "clap", @@ -10495,9 +10496,9 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "0.12.2" +version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd90e73d5f5b184bad525767da29fbfec132b4e62ebd6f60d2f2737ec6468f62" +checksum = "ec13bfb4c4aef208f68dbea970dd40d13830c868aa8dcb4e106b956e6bb4f2fa" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -10509,9 +10510,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "0.12.2" +version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21f673fcefb3a7e7b89a12b6c0e854ec0be14367635ac3435369c8ad7f11e09e" +checksum = "ac734b6e5610c2764056cc8495fbc293cd1c8ebe084fdfb74c3b0cdaaff9bb92" dependencies = [ "async-trait", "clap", @@ -10526,9 +10527,9 @@ dependencies = [ [[package]] name = "sea-query" -version = "0.30.1" +version = "0.30.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28c05a5bf6403834be253489bbe95fa9b1e5486bc843b61f60d26b5c9c1e244b" +checksum = "4166a1e072292d46dc91f31617c2a1cdaf55a8be4b5c9f4bf2ba248e3ac4999b" dependencies = [ "bigdecimal 0.3.1", "chrono", @@ -10573,9 +10574,9 @@ dependencies = [ [[package]] name = "sea-schema" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cd9561232bd1b82ea748b581f15909d11de0db6563ddcf28c5d908aee8282f1" +checksum = "30d148608012d25222442d1ebbfafd1228dbc5221baf4ec35596494e27a2394e" dependencies = [ "futures", "sea-query", @@ -10686,9 +10687,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] @@ -10735,9 +10736,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", @@ -10757,9 +10758,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 013ce2200f0d1..20a82883e81ce 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -61,15 +61,15 @@ risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } rw_futures_util = { workspace = true } scopeguard = "1.2.0" -sea-orm = { version = "0.12.0", features = [ +sea-orm = { version = "0.12.14", features = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "macros", ] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" +serde = { version = "1.0.196", features = ["derive"] } +serde_json = "1.0.113" strum = { version = "0.25", features = ["derive"] } sync-point = { path = "../utils/sync-point" } thiserror = "1" diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml index 8cc1407983f36..9d75bf22b5c6e 100644 --- a/src/meta/model_v2/Cargo.toml +++ b/src/meta/model_v2/Cargo.toml @@ -14,15 +14,16 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +prost = { workspace = true } risingwave_common = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } -sea-orm = { version = "0.12.0", features = [ +sea-orm = { version = "0.12.14", features = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "macros", ] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" +serde = { version = "1.0.196", features = ["derive"] } +serde_json = "1.0.113" diff --git a/src/meta/model_v2/migration/Cargo.toml b/src/meta/model_v2/migration/Cargo.toml index 4745125140a22..b99efe2b7b0a7 100644 --- a/src/meta/model_v2/migration/Cargo.toml +++ b/src/meta/model_v2/migration/Cargo.toml @@ -18,5 +18,5 @@ async-std = { version = "1", features = ["attributes", "tokio1"] } uuid = { version = "1", features = ["v4"] } [dependencies.sea-orm-migration] -version = "0.12.0" +version = "0.12.14" features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"] diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index cf4bd9d375598..2da479531abf0 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -361,7 +361,7 @@ impl MigrationTrait for Migration { .string() .not_null(), ) - .col(ColumnDef::new(Fragment::StreamNode).json().not_null()) + .col(ColumnDef::new(Fragment::StreamNode).binary().not_null()) .col(ColumnDef::new(Fragment::VnodeMapping).json().not_null()) .col(ColumnDef::new(Fragment::StateTableIds).json()) .col(ColumnDef::new(Fragment::UpstreamFragmentId).json()) diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index af1d529a05980..27a5898064595 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Formatter; + use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; +use risingwave_pb::stream_plan::PbStreamNode; use sea_orm::entity::prelude::*; -use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode}; +use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] @@ -31,6 +34,34 @@ pub struct Model { pub upstream_fragment_id: I32Array, } +/// This is a workaround to avoid stack overflow when deserializing the `StreamNode` field from sql +/// backend if we store it as Json. We'd better fix it before using it in production, because it's less +/// readable and maintainable. +#[derive(Clone, PartialEq, Eq, DeriveValueType)] +pub struct StreamNode(#[sea_orm] Vec); + +impl StreamNode { + pub fn to_protobuf(&self) -> PbStreamNode { + prost::Message::decode(self.0.as_slice()).unwrap() + } + + pub fn from_protobuf(val: &PbStreamNode) -> Self { + Self(prost::Message::encode_to_vec(val)) + } +} + +impl std::fmt::Debug for StreamNode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } +} + +impl Default for StreamNode { + fn default() -> Self { + Self::from_protobuf(&PbStreamNode::default()) + } +} + #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DistributionType { diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 8c85cad2a6597..2abd66fae3fe3 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -216,8 +216,6 @@ derive_from_json_struct!( ); derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo); -derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode); - derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer); derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index 6a5316b0e422c..d2d4e1362f93e 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -25,6 +25,7 @@ pub use super::fragment::Entity as Fragment; pub use super::function::Entity as Function; pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot; pub use super::hummock_pinned_version::Entity as HummockPinnedVersion; +pub use super::hummock_sequence::Entity as HummockSequence; pub use super::hummock_version_delta::Entity as HummockVersionDelta; pub use super::hummock_version_stats::Entity as HummockVersionStats; pub use super::index::Entity as Index; diff --git a/src/meta/node/Cargo.toml b/src/meta/node/Cargo.toml index a799c99b98c88..0538fa19280e7 100644 --- a/src/meta/node/Cargo.toml +++ b/src/meta/node/Cargo.toml @@ -32,7 +32,7 @@ risingwave_meta_model_migration = { workspace = true } risingwave_meta_service = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } -sea-orm = { version = "0.12.0", features = [ +sea-orm = { version = "0.12.14", features = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 2ba993d7eab55..a91e570ccd6a8 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -27,7 +27,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_meta = { workspace = true } risingwave_meta_model_v2 = { workspace = true } risingwave_pb = { workspace = true } -sea-orm = { version = "0.12.0", features = [ +sea-orm = { version = "0.12.14", features = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 2ca8e9b98be87..76eab54f631a0 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -656,7 +656,7 @@ impl CatalogController { .await?; pb_source.id = source_obj.oid as _; let source: source::ActiveModel = pb_source.clone().into(); - source.insert(&txn).await?; + Source::insert(source).exec(&txn).await?; if let Some(src_manager) = source_manager_ref { let ret = src_manager.register_source(&pb_source).await; @@ -698,7 +698,7 @@ impl CatalogController { .await?; pb_function.id = function_obj.oid as _; let function: function::ActiveModel = pb_function.clone().into(); - function.insert(&txn).await?; + Function::insert(function).exec(&txn).await?; txn.commit().await?; let version = self @@ -774,7 +774,7 @@ impl CatalogController { .await?; pb_connection.id = conn_obj.oid as _; let connection: connection::ActiveModel = pb_connection.clone().into(); - connection.insert(&txn).await?; + Connection::insert(connection).exec(&txn).await?; txn.commit().await?; @@ -869,17 +869,17 @@ impl CatalogController { .await?; pb_view.id = view_obj.oid as _; let view: view::ActiveModel = pb_view.clone().into(); - view.insert(&txn).await?; + View::insert(view).exec(&txn).await?; // todo: change `dependent_relations` to `dependent_objects`, which should includes connection and function as well. // todo: shall we need to check existence of them Or let database handle it by FOREIGN KEY constraint. for obj_id in &pb_view.dependent_relations { - object_dependency::ActiveModel { + ObjectDependency::insert(object_dependency::ActiveModel { oid: Set(*obj_id as _), used_by: Set(view_obj.oid), ..Default::default() - } - .insert(&txn) + }) + .exec(&txn) .await?; } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4f7fb469aec97..833e642a83e74 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -20,11 +20,12 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, + StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -281,7 +282,7 @@ impl CatalogController { let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(); - let stream_node = StreamNode(stream_node); + let stream_node = StreamNode::from_protobuf(&stream_node); let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type) .unwrap() @@ -367,7 +368,7 @@ impl CatalogController { upstream_fragment_id, } = fragment; - let stream_node_template = stream_node.into_inner(); + let stream_node_template = stream_node.to_protobuf(); let mut pb_actors = vec![]; @@ -1186,7 +1187,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new) @@ -1216,7 +1217,7 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new) @@ -1278,11 +1279,10 @@ mod tests { use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; - use risingwave_meta_model_v2::fragment::DistributionType; + use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, - ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, - VnodeBitmap, + ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, TableId, VnodeBitmap, }; use risingwave_pb::common::ParallelUnit; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; @@ -1448,13 +1448,13 @@ mod tests { actors: Vec, upstream_actor_ids: &HashMap>>, ) { - let stream_node_template = fragment.stream_node.clone(); + let stream_node_template = fragment.stream_node.to_protobuf(); for PbStreamActor { nodes, actor_id, .. } in actors { - let mut template_node = stream_node_template.clone().into_inner(); + let mut template_node = stream_node_template.clone(); let nodes = nodes.unwrap(); let actor_upstream_actor_ids = upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap(); @@ -1553,7 +1553,7 @@ mod tests { job_id: TEST_JOB_ID, fragment_type_mask: 0, distribution_type: DistributionType::Hash, - stream_node: StreamNode(stream_node), + stream_node: StreamNode::from_protobuf(&stream_node), vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()), state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]), upstream_fragment_id: I32Array::default(), diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 8afc5ab6c836c..f4004e25b500c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -22,16 +22,17 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; +use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ - Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, + Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source, StreamingJob as StreamingJobModel, Table, }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, - StreamingParallelism, TableId, TableVersion, UserId, + FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, + TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -137,7 +138,7 @@ impl CatalogController { .await?; table.id = job_id as _; let table: table::ActiveModel = table.clone().into(); - table.insert(&txn).await?; + Table::insert(table).exec(&txn).await?; } StreamingJob::Sink(sink, _) => { let job_id = Self::create_streaming_job_obj( @@ -153,7 +154,7 @@ impl CatalogController { .await?; sink.id = job_id as _; let sink: sink::ActiveModel = sink.clone().into(); - sink.insert(&txn).await?; + Sink::insert(sink).exec(&txn).await?; } StreamingJob::Table(src, table, _) => { let job_id = Self::create_streaming_job_obj( @@ -184,10 +185,10 @@ impl CatalogController { PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _), ); let source: source::ActiveModel = src.clone().into(); - source.insert(&txn).await?; + Source::insert(source).exec(&txn).await?; } let table: table::ActiveModel = table.clone().into(); - table.insert(&txn).await?; + Table::insert(table).exec(&txn).await?; } StreamingJob::Index(index, table) => { ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?; @@ -207,18 +208,18 @@ impl CatalogController { index.index_table_id = job_id as _; table.id = job_id as _; - object_dependency::ActiveModel { + ObjectDependency::insert(object_dependency::ActiveModel { oid: Set(index.primary_table_id as _), used_by: Set(table.id as _), ..Default::default() - } - .insert(&txn) + }) + .exec(&txn) .await?; let table: table::ActiveModel = table.clone().into(); - table.insert(&txn).await?; + Table::insert(table).exec(&txn).await?; let index: index::ActiveModel = index.clone().into(); - index.insert(&txn).await?; + Index::insert(index).exec(&txn).await?; } StreamingJob::Source(src) => { let job_id = Self::create_streaming_job_obj( @@ -234,7 +235,7 @@ impl CatalogController { .await?; src.id = job_id as _; let source: source::ActiveModel = src.clone().into(); - source.insert(&txn).await?; + Source::insert(source).exec(&txn).await?; } } @@ -280,7 +281,7 @@ impl CatalogController { table.table_id = Set(table_id as _); table.belongs_to_job_id = Set(Some(job_id as _)); table.fragment_id = NotSet; - table.insert(&txn).await?; + Table::insert(table).exec(&txn).await?; } txn.commit().await?; @@ -304,15 +305,17 @@ impl CatalogController { .map(|(fragment, actors, actor_dispatchers)| (fragment, (actors, actor_dispatchers))) .unzip(); for fragment in fragments { + let fragment_id = fragment.fragment_id; + let state_table_ids = fragment.state_table_ids.inner_ref().clone(); let fragment = fragment.into_active_model(); - let fragment = fragment.insert(&txn).await?; + Fragment::insert(fragment).exec(&txn).await?; // Update fragment id for all state tables. if !for_replace { - for state_table_id in fragment.state_table_ids.into_inner() { + for state_table_id in state_table_ids { table::ActiveModel { table_id: Set(state_table_id as _), - fragment_id: Set(Some(fragment.fragment_id as _)), + fragment_id: Set(Some(fragment_id)), ..Default::default() } .update(&txn) @@ -325,13 +328,13 @@ impl CatalogController { for (actors, actor_dispatchers) in actor_with_dispatchers { for actor in actors { let actor = actor.into_active_model(); - actor.insert(&txn).await?; + Actor::insert(actor).exec(&txn).await?; } for (_, actor_dispatchers) in actor_dispatchers { for actor_dispatcher in actor_dispatchers { let mut actor_dispatcher = actor_dispatcher.into_active_model(); actor_dispatcher.id = NotSet; - actor_dispatcher.insert(&txn).await?; + ActorDispatcher::insert(actor_dispatcher).exec(&txn).await?; } } } @@ -632,8 +635,9 @@ impl CatalogController { .into_tuple::<(FragmentId, StreamNode, I32Array)>() .one(&txn) .await? + .map(|(id, node, upstream)| (id, node.to_protobuf(), upstream)) .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; - visit_stream_node(&mut stream_node.0, |body| { + visit_stream_node(&mut stream_node, |body| { if let PbNodeBody::Merge(m) = body && let Some((new_fragment_id, new_actor_ids)) = fragment_replace_map.get(&m.upstream_fragment_id) @@ -649,7 +653,7 @@ impl CatalogController { } fragment::ActiveModel { fragment_id: Set(fragment_id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), upstream_fragment_id: Set(upstream_fragment_id), ..Default::default() } @@ -773,7 +777,7 @@ impl CatalogController { ))); } - let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, @@ -784,11 +788,15 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + let mut fragments = fragments + .into_iter() + .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) + .collect_vec(); fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { - visit_stream_node(&mut stream_node.0, |node| { + visit_stream_node(stream_node, |node| { if let PbNodeBody::Source(node) = node { if let Some(node_inner) = &mut node.source_inner && node_inner.source_id == source_id as u32 @@ -810,7 +818,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), ..Default::default() } .update(&txn) @@ -833,7 +841,7 @@ impl CatalogController { let inner = self.inner.read().await; let txn = inner.db.begin().await?; - let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, @@ -844,11 +852,15 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + let mut fragments = fragments + .into_iter() + .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) + .collect_vec(); fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 { - visit_stream_node(&mut stream_node.0, |node| { + visit_stream_node(stream_node, |node| { if let PbNodeBody::StreamScan(node) = node { node.rate_limit = rate_limit; found = true; @@ -867,7 +879,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(stream_node), + stream_node: Set(StreamNode::from_protobuf(&stream_node)), ..Default::default() } .update(&txn) diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 6802ad17e1769..4b2e598a2c221 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -173,10 +173,7 @@ impl SystemParamsController { // delete all params first and then insert all params. It follows the same logic // as the old code, we'd better change it to another way later to keep consistency. SystemParameter::delete_many().exec(&txn).await?; - - for model in models { - model.insert(&txn).await?; - } + SystemParameter::insert_many(models).exec(&txn).await?; txn.commit().await?; Ok(()) } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 53ac3c9616e28..ff19892d516b5 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -18,14 +18,14 @@ use anyhow::anyhow; use itertools::Itertools; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::DistributionType; +use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, - SchemaId, SourceId, StreamNode, UserId, + SchemaId, SourceId, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; use risingwave_pb::meta::PbFragmentParallelUnitMapping; @@ -761,7 +761,7 @@ where let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = find_stream_source(stream_node.inner_ref()) { + if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { source_fragment_ids .entry(source.source_id as SourceId) .or_insert_with(BTreeSet::new) diff --git a/src/meta/src/hummock/manager/sequence.rs b/src/meta/src/hummock/manager/sequence.rs index c1cdda952c6ed..ab154376404d2 100644 --- a/src/meta/src/hummock/manager/sequence.rs +++ b/src/meta/src/hummock/manager/sequence.rs @@ -18,6 +18,7 @@ use std::sync::LazyLock; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_meta_model_v2::hummock_sequence; +use risingwave_meta_model_v2::prelude::HummockSequence; use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; use tokio::sync::Mutex; @@ -71,7 +72,7 @@ impl SequenceGenerator { name: ActiveValue::set(ident.into()), seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()), }; - active_model.insert(&txn).await?; + HummockSequence::insert(active_model).exec(&txn).await?; init } Some(model) => { diff --git a/src/meta/src/manager/notification_version.rs b/src/meta/src/manager/notification_version.rs index 87ccd3ff7f548..18c8c7cc9d20f 100644 --- a/src/meta/src/manager/notification_version.rs +++ b/src/meta/src/manager/notification_version.rs @@ -41,11 +41,11 @@ impl NotificationVersionGenerator { .await?; let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64; if model.is_none() { - catalog_version::ActiveModel { + CatalogVersion::insert(catalog_version::ActiveModel { name: Set(VersionCategory::Notification), version: Set(1), - } - .insert(&txn) + }) + .exec(&txn) .await?; txn.commit().await?; }