diff --git a/e2e_test/sql_migration/check.slt b/e2e_test/sql_migration/check.slt index b4c97bba50bf1..41e83daa0db18 100644 --- a/e2e_test/sql_migration/check.slt +++ b/e2e_test/sql_migration/check.slt @@ -24,7 +24,6 @@ SELECT setting FROM pg_catalog.pg_settings where name = 'max_concurrent_creating query T rowsort select name, relation_type from rw_relations where relation_type != 'system table' AND relation_type != 'view'; ---- -ddl_subscription_table subscription idx1 index m_simple table mv1 materialized view diff --git a/e2e_test/sql_migration/prepare.slt b/e2e_test/sql_migration/prepare.slt index f0669a4c6b297..17e32f3561b03 100644 --- a/e2e_test/sql_migration/prepare.slt +++ b/e2e_test/sql_migration/prepare.slt @@ -56,9 +56,6 @@ create table m_simple (v1 int primary key, v2 int); statement ok create sink s_simple_1 into m_simple as select v1, v2 from t_simple; -statement ok -create subscription ddl_subscription_table from mv2 with(retention = '1D'); - statement ok insert into t1 select * from generate_series(1, 1000); diff --git a/proto/catalog.proto b/proto/catalog.proto index 67e71848e3d00..7858cd5398db2 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -177,6 +177,8 @@ message Subscription { optional string initialized_at_cluster_version = 15; optional string created_at_cluster_version = 16; + + string subscription_from_name = 17; } message Connection { diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 29461d42b9f88..db3e166c07a6f 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -38,7 +38,6 @@ use risingwave_meta::stream::TableRevision; use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_model_v2::catalog_version::VersionCategory; use risingwave_meta_model_v2::compaction_status::LevelHandlers; -use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::hummock_sequence::{ COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID, }; @@ -435,7 +434,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap(); }); let mut fragment = fragment.into_active_model(); - fragment.stream_node = Set(StreamNode::from_protobuf(&stream_node)); + fragment.stream_node = Set((&stream_node).into()); Fragment::insert(fragment) .exec(&meta_store_sql.conn) .await?; @@ -683,7 +682,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an max_committed_epoch: Set(vd.max_committed_epoch as _), safe_epoch: Set(vd.safe_epoch as _), trivial_move: Set(vd.trivial_move), - full_version_delta: Set(vd.to_protobuf().into()), + full_version_delta: Set((&vd.to_protobuf()).into()), }) .collect_vec(), ) @@ -716,7 +715,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .into_iter() .map(|cg| compaction_config::ActiveModel { compaction_group_id: Set(cg.group_id as _), - config: Set((*cg.compaction_config).clone().into()), + config: Set((&*cg.compaction_config).into()), }) .collect_vec(), ) @@ -733,7 +732,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .into_iter() .map(|cs| compaction_status::ActiveModel { compaction_group_id: Set(cs.compaction_group_id as _), - status: Set(LevelHandlers(cs.level_handlers.iter().map_into().collect())), + status: Set(LevelHandlers::from( + cs.level_handlers.iter().map_into().collect_vec(), + )), }) .collect_vec(), ) @@ -751,7 +752,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an compaction_task::ActiveModel { id: Set(task.task_id as _), context_id: Set(context_id as _), - task: Set(task.into()), + task: Set((&task).into()), } })) .exec(&meta_store_sql.conn) diff --git a/src/meta/model_v2/migration/README.md b/src/meta/model_v2/migration/README.md index 3b438d89e31c5..527d4ba538071 100644 --- a/src/meta/model_v2/migration/README.md +++ b/src/meta/model_v2/migration/README.md @@ -1,10 +1,19 @@ # Running Migrator CLI +> **WARNING:** Migration files are used to define schema changes for the database. Each migration file contains an up and down function, +> which are used to define upgrade and downgrade operations for the schema. +> +> When you need to make schema changes to the system catalog, you need to generate a new migration file and then apply it to the database. +> Note that each migration file can only be applied once and will be recorded in a system table, so for new schema changes, you need to +> generate a new migration file. Unless you are sure the modification of the migration file has not been included in any released version yet, +> **DO NOT** modify already published migration files. + +## How to run the migrator CLI - Generate a new migration file ```sh cargo run -- generate MIGRATION_NAME ``` -- Apply all pending migrations +- Apply all pending migrations for test purposes, `DATABASE_URL` required. ```sh cargo run ``` diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 14b79b33f26d0..260344a8b4fd9 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -140,7 +140,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(User::CanCreateDb).boolean().not_null()) .col(ColumnDef::new(User::CanCreateUser).boolean().not_null()) .col(ColumnDef::new(User::CanLogin).boolean().not_null()) - .col(ColumnDef::new(User::AuthInfo).json_binary()) + .col(ColumnDef::new(User::AuthInfo).binary()) .to_owned(), ) .await?; @@ -381,11 +381,7 @@ impl MigrationTrait for Migration { .blob(BlobSize::Long) .not_null(), ) - .col( - ColumnDef::new(Fragment::VnodeMapping) - .json_binary() - .not_null(), - ) + .col(ColumnDef::new(Fragment::VnodeMapping).binary().not_null()) .col(ColumnDef::new(Fragment::StateTableIds).json_binary()) .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary()) .foreign_key( @@ -411,12 +407,12 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Actor::FragmentId).integer().not_null()) .col(ColumnDef::new(Actor::Status).string().not_null()) - .col(ColumnDef::new(Actor::Splits).json_binary()) + .col(ColumnDef::new(Actor::Splits).binary()) .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null()) .col(ColumnDef::new(Actor::WorkerId).integer().not_null()) .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary()) - .col(ColumnDef::new(Actor::VnodeBitmap).json_binary()) - .col(ColumnDef::new(Actor::ExprContext).json_binary().not_null()) + .col(ColumnDef::new(Actor::VnodeBitmap).binary()) + .col(ColumnDef::new(Actor::ExprContext).binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_actor_fragment_id") @@ -458,7 +454,7 @@ impl MigrationTrait for Migration { .json_binary() .not_null(), ) - .col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary()) + .col(ColumnDef::new(ActorDispatcher::HashMapping).binary()) .col( ColumnDef::new(ActorDispatcher::DispatcherId) .integer() @@ -499,7 +495,7 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Connection::Name).string().not_null()) - .col(ColumnDef::new(Connection::Info).json_binary().not_null()) + .col(ColumnDef::new(Connection::Info).binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_connection_object_id") @@ -518,7 +514,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Source::SourceId).integer().primary_key()) .col(ColumnDef::new(Source::Name).string().not_null()) .col(ColumnDef::new(Source::RowIdIndex).integer()) - .col(ColumnDef::new(Source::Columns).json_binary().not_null()) + .col(ColumnDef::new(Source::Columns).binary().not_null()) .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null()) .col( ColumnDef::new(Source::WithProperties) @@ -526,12 +522,8 @@ impl MigrationTrait for Migration { .not_null(), ) .col(ColumnDef::new(Source::Definition).text().not_null()) - .col(ColumnDef::new(Source::SourceInfo).json_binary()) - .col( - ColumnDef::new(Source::WatermarkDescs) - .json_binary() - .not_null(), - ) + .col(ColumnDef::new(Source::SourceInfo).binary()) + .col(ColumnDef::new(Source::WatermarkDescs).binary().not_null()) .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer()) .col(ColumnDef::new(Source::ConnectionId).integer()) .col(ColumnDef::new(Source::Version).big_integer().not_null()) @@ -570,8 +562,8 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer()) .col(ColumnDef::new(Table::TableType).string().not_null()) .col(ColumnDef::new(Table::BelongsToJobId).integer()) - .col(ColumnDef::new(Table::Columns).json_binary().not_null()) - .col(ColumnDef::new(Table::Pk).json_binary().not_null()) + .col(ColumnDef::new(Table::Columns).binary().not_null()) + .col(ColumnDef::new(Table::Pk).binary().not_null()) .col( ColumnDef::new(Table::DistributionKey) .json_binary() @@ -601,14 +593,14 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null()) .col(ColumnDef::new(Table::DmlFragmentId).integer()) - .col(ColumnDef::new(Table::Cardinality).json_binary()) + .col(ColumnDef::new(Table::Cardinality).binary()) .col( ColumnDef::new(Table::CleanedByWatermark) .boolean() .not_null(), ) .col(ColumnDef::new(Table::Description).string()) - .col(ColumnDef::new(Table::Version).json_binary()) + .col(ColumnDef::new(Table::Version).binary()) .col(ColumnDef::new(Table::RetentionSeconds).integer()) .col( ColumnDef::new(Table::IncomingSinks) @@ -650,7 +642,8 @@ impl MigrationTrait for Migration { &mut ForeignKey::create() .name("FK_table_optional_associated_source_id") .from(Table::Table, Table::OptionalAssociatedSourceId) - .to(Source::Table, Source::SourceId) + .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) .to_owned(), ) .to_owned(), @@ -662,8 +655,8 @@ impl MigrationTrait for Migration { .table(Sink::Table) .col(ColumnDef::new(Sink::SinkId).integer().primary_key()) .col(ColumnDef::new(Sink::Name).string().not_null()) - .col(ColumnDef::new(Sink::Columns).json_binary().not_null()) - .col(ColumnDef::new(Sink::PlanPk).json_binary().not_null()) + .col(ColumnDef::new(Sink::Columns).binary().not_null()) + .col(ColumnDef::new(Sink::PlanPk).binary().not_null()) .col( ColumnDef::new(Sink::DistributionKey) .json_binary() @@ -676,7 +669,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Sink::ConnectionId).integer()) .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) - .col(ColumnDef::new(Sink::SinkFormatDesc).json_binary()) + .col(ColumnDef::new(Sink::SinkFormatDesc).binary()) .col(ColumnDef::new(Sink::TargetTable).integer()) .foreign_key( &mut ForeignKey::create() @@ -711,7 +704,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(View::Name).string().not_null()) .col(ColumnDef::new(View::Properties).json_binary().not_null()) .col(ColumnDef::new(View::Definition).text().not_null()) - .col(ColumnDef::new(View::Columns).json_binary().not_null()) + .col(ColumnDef::new(View::Columns).binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_view_object_id") @@ -731,7 +724,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Index::Name).string().not_null()) .col(ColumnDef::new(Index::IndexTableId).integer().not_null()) .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null()) - .col(ColumnDef::new(Index::IndexItems).json_binary().not_null()) + .col(ColumnDef::new(Index::IndexItems).binary().not_null()) .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null()) .foreign_key( &mut ForeignKey::create() @@ -767,12 +760,8 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Function::FunctionId).integer().primary_key()) .col(ColumnDef::new(Function::Name).string().not_null()) .col(ColumnDef::new(Function::ArgNames).string().not_null()) - .col(ColumnDef::new(Function::ArgTypes).json_binary().not_null()) - .col( - ColumnDef::new(Function::ReturnType) - .json_binary() - .not_null(), - ) + .col(ColumnDef::new(Function::ArgTypes).binary().not_null()) + .col(ColumnDef::new(Function::ReturnType).binary().not_null()) .col(ColumnDef::new(Function::Language).string().not_null()) .col(ColumnDef::new(Function::Link).string()) .col(ColumnDef::new(Function::Identifier).string()) diff --git a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs index b90e088da1f14..99f4d701c6445 100644 --- a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs +++ b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs @@ -30,11 +30,7 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col( - ColumnDef::new(CompactionTask::Task) - .json_binary() - .not_null(), - ) + .col(ColumnDef::new(CompactionTask::Task).binary().not_null()) .col( ColumnDef::new(CompactionTask::ContextId) .integer() @@ -54,7 +50,7 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col(ColumnDef::new(CompactionConfig::Config).json_binary()) + .col(ColumnDef::new(CompactionConfig::Config).binary()) .to_owned(), ) .await?; @@ -69,7 +65,7 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col(ColumnDef::new(CompactionStatus::Status).json_binary()) + .col(ColumnDef::new(CompactionStatus::Status).binary()) .to_owned(), ) .await?; @@ -142,7 +138,7 @@ impl MigrationTrait for Migration { .boolean() .not_null(), ) - .col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).json_binary()) + .col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).binary()) .to_owned(), ) .await?; diff --git a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs index ece549c4f922d..50564c9b211a6 100644 --- a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs +++ b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs @@ -19,16 +19,8 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col( - ColumnDef::new(Subscription::Columns) - .json_binary() - .not_null(), - ) - .col( - ColumnDef::new(Subscription::PlanPk) - .json_binary() - .not_null(), - ) + .col(ColumnDef::new(Subscription::Columns).binary().not_null()) + .col(ColumnDef::new(Subscription::PlanPk).binary().not_null()) .col( ColumnDef::new(Subscription::DistributionKey) .json_binary() @@ -40,6 +32,11 @@ impl MigrationTrait for Migration { .not_null(), ) .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::SubscriptionFromName) + .string() + .not_null(), + ) .to_owned(), ) .await?; @@ -63,4 +60,5 @@ enum Subscription { DistributionKey, Properties, Definition, + SubscriptionFromName, } diff --git a/src/meta/model_v2/src/actor_dispatcher.rs b/src/meta/model_v2/src/actor_dispatcher.rs index fa5ea0daa8af9..1d6a50665b12a 100644 --- a/src/meta/model_v2/src/actor_dispatcher.rs +++ b/src/meta/model_v2/src/actor_dispatcher.rs @@ -61,7 +61,7 @@ impl From<(u32, PbDispatcher)> for Model { dispatcher_type: dispatcher.r#type().into(), dist_key_indices: dispatcher.dist_key_indices.into(), output_indices: dispatcher.output_indices.into(), - hash_mapping: dispatcher.hash_mapping.map(ActorMapping), + hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from), dispatcher_id: dispatcher.dispatcher_id as _, downstream_actor_ids: dispatcher.downstream_actor_id.into(), } @@ -74,7 +74,7 @@ impl From for PbDispatcher { r#type: PbDispatcherType::from(model.dispatcher_type) as _, dist_key_indices: model.dist_key_indices.into_u32_array(), output_indices: model.output_indices.into_u32_array(), - hash_mapping: model.hash_mapping.map(|mapping| mapping.into_inner()), + hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()), dispatcher_id: model.dispatcher_id as _, downstream_actor_id: model.downstream_actor_ids.into_u32_array(), } diff --git a/src/meta/model_v2/src/compaction_config.rs b/src/meta/model_v2/src/compaction_config.rs index 39ce6bcef2d44..7d2612679717d 100644 --- a/src/meta/model_v2/src/compaction_config.rs +++ b/src/meta/model_v2/src/compaction_config.rs @@ -14,7 +14,6 @@ use risingwave_pb::hummock::CompactionConfig as PbCompactionConfig; use sea_orm::entity::prelude::*; -use sea_orm::FromJsonQueryResult; use serde::{Deserialize, Serialize}; use crate::CompactionGroupId; @@ -32,4 +31,4 @@ pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} -crate::derive_from_json_struct!(CompactionConfig, PbCompactionConfig); +crate::derive_from_blob!(CompactionConfig, PbCompactionConfig); diff --git a/src/meta/model_v2/src/compaction_status.rs b/src/meta/model_v2/src/compaction_status.rs index 0e2afb84f9e93..79fbf8006258f 100644 --- a/src/meta/model_v2/src/compaction_status.rs +++ b/src/meta/model_v2/src/compaction_status.rs @@ -14,8 +14,6 @@ use risingwave_pb::hummock::LevelHandler as PbLevelHandler; use sea_orm::entity::prelude::*; -use sea_orm::FromJsonQueryResult; -use serde::{Deserialize, Serialize}; use crate::CompactionGroupId; @@ -32,4 +30,4 @@ pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} -crate::derive_from_json_struct!(LevelHandlers, Vec); +crate::derive_array_from_blob!(LevelHandlers, PbLevelHandler, PbLevelHandlerArray); diff --git a/src/meta/model_v2/src/compaction_task.rs b/src/meta/model_v2/src/compaction_task.rs index 80d509d2299f0..074fe9af450ea 100644 --- a/src/meta/model_v2/src/compaction_task.rs +++ b/src/meta/model_v2/src/compaction_task.rs @@ -14,7 +14,6 @@ use risingwave_pb::hummock::{CompactTask as PbCompactTask, CompactTaskAssignment}; use sea_orm::entity::prelude::*; -use sea_orm::FromJsonQueryResult; use serde::{Deserialize, Serialize}; use crate::{CompactionTaskId, WorkerId}; @@ -33,12 +32,12 @@ pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} -crate::derive_from_json_struct!(CompactionTask, PbCompactTask); +crate::derive_from_blob!(CompactionTask, PbCompactTask); impl From for CompactTaskAssignment { fn from(value: Model) -> Self { Self { - compact_task: Some(value.task.0), + compact_task: Some(value.task.to_protobuf()), context_id: value.context_id as _, } } diff --git a/src/meta/model_v2/src/connection.rs b/src/meta/model_v2/src/connection.rs index fbd602434f15c..0e513e7061fd6 100644 --- a/src/meta/model_v2/src/connection.rs +++ b/src/meta/model_v2/src/connection.rs @@ -73,7 +73,7 @@ impl From for ActiveModel { Self { connection_id: Set(conn.id as _), name: Set(conn.name), - info: Set(PrivateLinkService(private_link_srv)), + info: Set(PrivateLinkService::from(&private_link_srv)), } } } diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index 27a5898064595..af1d529a05980 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Formatter; - use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; -use risingwave_pb::stream_plan::PbStreamNode; use sea_orm::entity::prelude::*; -use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId}; +use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] @@ -34,34 +31,6 @@ pub struct Model { pub upstream_fragment_id: I32Array, } -/// This is a workaround to avoid stack overflow when deserializing the `StreamNode` field from sql -/// backend if we store it as Json. We'd better fix it before using it in production, because it's less -/// readable and maintainable. -#[derive(Clone, PartialEq, Eq, DeriveValueType)] -pub struct StreamNode(#[sea_orm] Vec); - -impl StreamNode { - pub fn to_protobuf(&self) -> PbStreamNode { - prost::Message::decode(self.0.as_slice()).unwrap() - } - - pub fn from_protobuf(val: &PbStreamNode) -> Self { - Self(prost::Message::encode_to_vec(val)) - } -} - -impl std::fmt::Debug for StreamNode { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.to_protobuf().fmt(f) - } -} - -impl Default for StreamNode { - fn default() -> Self { - Self::from_protobuf(&PbStreamNode::default()) - } -} - #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DistributionType { diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs index eaf368aa15d1e..e0d79f3bc5571 100644 --- a/src/meta/model_v2/src/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -95,8 +95,8 @@ impl From for ActiveModel { function_id: Set(function.id as _), name: Set(function.name), arg_names: Set(function.arg_names.join(",")), - arg_types: Set(DataTypeArray(function.arg_types)), - return_type: Set(DataType(function.return_type.unwrap())), + arg_types: Set(DataTypeArray::from(function.arg_types)), + return_type: Set(DataType::from(&function.return_type.unwrap())), language: Set(function.language), link: Set(function.link), identifier: Set(function.identifier), diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs index f071034df4341..ae69f5eca9a1d 100644 --- a/src/meta/model_v2/src/hummock_version_delta.rs +++ b/src/meta/model_v2/src/hummock_version_delta.rs @@ -14,7 +14,6 @@ use risingwave_pb::hummock::PbHummockVersionDelta; use sea_orm::entity::prelude::*; -use sea_orm::FromJsonQueryResult; use serde::{Deserialize, Serialize}; use crate::{Epoch, HummockVersionId}; @@ -36,11 +35,11 @@ pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} -crate::derive_from_json_struct!(FullVersionDelta, PbHummockVersionDelta); +crate::derive_from_blob!(FullVersionDelta, PbHummockVersionDelta); impl From for PbHummockVersionDelta { fn from(value: Model) -> Self { - let ret = value.full_version_delta.into_inner(); + let ret = value.full_version_delta.to_protobuf(); assert_eq!(value.id, ret.id as i64); assert_eq!(value.prev_id, ret.prev_id as i64); assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64); diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 974cdf4fd2671..a2eb99c16aed0 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -16,6 +16,8 @@ use std::collections::{BTreeMap, HashMap}; use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus}; use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState; +use risingwave_pb::stream_plan::PbStreamNode; +use sea_orm::entity::prelude::*; use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult}; use serde::{Deserialize, Serialize}; @@ -164,7 +166,95 @@ macro_rules! derive_from_json_struct { }; } -pub(crate) use derive_from_json_struct; +/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct. +macro_rules! derive_from_blob { + ($struct_name:ident, $field_type:ty) => { + #[derive(Clone, PartialEq, Eq, Serialize, Deserialize, DeriveValueType)] + pub struct $struct_name(#[sea_orm] Vec); + + impl $struct_name { + pub fn to_protobuf(&self) -> $field_type { + prost::Message::decode(self.0.as_slice()).unwrap() + } + + fn from_protobuf(val: &$field_type) -> Self { + Self(prost::Message::encode_to_vec(val)) + } + } + + impl sea_orm::sea_query::Nullable for $struct_name { + fn null() -> Value { + Value::Bytes(None) + } + } + + impl From<&$field_type> for $struct_name { + fn from(value: &$field_type) -> Self { + Self::from_protobuf(value) + } + } + + impl std::fmt::Debug for $struct_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } + } + + impl Default for $struct_name { + fn default() -> Self { + Self::from_protobuf(&<$field_type>::default()) + } + } + }; +} + +/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array. +macro_rules! derive_array_from_blob { + ($struct_name:ident, $field_type:ty, $field_array_name:ident) => { + #[derive(Clone, PartialEq, Eq, DeriveValueType)] + pub struct $struct_name(#[sea_orm] Vec); + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct $field_array_name { + #[prost(message, repeated, tag = "1")] + inner: Vec<$field_type>, + } + impl Eq for $field_array_name {} + + impl $struct_name { + pub fn to_protobuf(&self) -> Vec<$field_type> { + let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap(); + data.inner + } + + fn from_protobuf(val: Vec<$field_type>) -> Self { + Self(prost::Message::encode_to_vec(&$field_array_name { + inner: val, + })) + } + } + + impl From> for $struct_name { + fn from(value: Vec<$field_type>) -> Self { + Self::from_protobuf(value) + } + } + + impl std::fmt::Debug for $struct_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } + } + + impl Default for $struct_name { + fn default() -> Self { + Self(vec![]) + } + } + }; +} + +pub(crate) use {derive_array_from_blob, derive_from_blob, derive_from_json_struct}; derive_from_json_struct!(I32Array, Vec); @@ -192,38 +282,57 @@ impl From>> for ActorUpstreamActors { } } -derive_from_json_struct!(DataType, risingwave_pb::data::DataType); -derive_from_json_struct!(DataTypeArray, Vec); -derive_from_json_struct!(FieldArray, Vec); +derive_from_blob!(StreamNode, PbStreamNode); +derive_from_blob!(DataType, risingwave_pb::data::PbDataType); +derive_array_from_blob!( + DataTypeArray, + risingwave_pb::data::PbDataType, + PbDataTypeArray +); +derive_array_from_blob!( + FieldArray, + risingwave_pb::plan_common::PbField, + PbFieldArray +); derive_from_json_struct!(Property, HashMap); -derive_from_json_struct!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog); -derive_from_json_struct!( +derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog); +derive_array_from_blob!( ColumnCatalogArray, - Vec + risingwave_pb::plan_common::PbColumnCatalog, + PbColumnCatalogArray ); -derive_from_json_struct!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); -derive_from_json_struct!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); -derive_from_json_struct!( +derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); +derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); +derive_array_from_blob!( WatermarkDescArray, - Vec + risingwave_pb::catalog::PbWatermarkDesc, + PbWatermarkDescArray +); +derive_array_from_blob!( + ExprNodeArray, + risingwave_pb::expr::PbExprNode, + PbExprNodeArray +); +derive_array_from_blob!( + ColumnOrderArray, + risingwave_pb::common::PbColumnOrder, + PbColumnOrderArray ); -derive_from_json_struct!(ExprNodeArray, Vec); -derive_from_json_struct!(ColumnOrderArray, Vec); -derive_from_json_struct!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc); -derive_from_json_struct!(Cardinality, risingwave_pb::plan_common::PbCardinality); -derive_from_json_struct!(TableVersion, risingwave_pb::catalog::table::PbTableVersion); -derive_from_json_struct!( +derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc); +derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality); +derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion); +derive_from_blob!( PrivateLinkService, risingwave_pb::catalog::connection::PbPrivateLinkService ); -derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo); +derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo); -derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); -derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer); -derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); -derive_from_json_struct!(ExprContext, risingwave_pb::plan_common::PbExprContext); +derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); +derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer); +derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); +derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext); -derive_from_json_struct!( +derive_from_blob!( FragmentVnodeMapping, risingwave_pb::common::ParallelUnitMapping ); diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index f5fa59c85ff5f..ab7e869daee68 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -124,7 +124,7 @@ impl From for ActiveModel { connection_id: Set(pb_sink.connection_id.map(|x| x as _)), db_name: Set(pb_sink.db_name), sink_from_name: Set(pb_sink.sink_from_name), - sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())), + sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), } } diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index b28dcd30ab131..2b0e511e4afe6 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -91,12 +91,12 @@ impl From for ActiveModel { source_id: Set(source.id as _), name: Set(source.name), row_id_index: Set(source.row_id_index.map(|x| x as _)), - columns: Set(ColumnCatalogArray(source.columns)), + columns: Set(ColumnCatalogArray::from(source.columns)), pk_column_ids: Set(I32Array(source.pk_column_ids)), with_properties: Set(Property(source.with_properties)), definition: Set(source.definition), - source_info: Set(source.info.map(StreamSourceInfo)), - watermark_descs: Set(WatermarkDescArray(source.watermark_descs)), + source_info: Set(source.info.as_ref().map(StreamSourceInfo::from)), + watermark_descs: Set(WatermarkDescArray::from(source.watermark_descs)), optional_associated_table_id: Set(optional_associated_table_id), connection_id: Set(source.connection_id.map(|id| id as _)), version: Set(source.version as _), diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index ca0d83ffca699..096c63078a2a4 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -29,6 +29,7 @@ pub struct Model { pub distribution_key: I32Array, pub properties: Property, pub definition: String, + pub subscription_from_name: String, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -61,6 +62,7 @@ impl From for ActiveModel { distribution_key: Set(pb_subscription.distribution_key.into()), properties: Set(pb_subscription.properties.into()), definition: Set(pb_subscription.definition), + subscription_from_name: Set(pb_subscription.subscription_from_name), } } } diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 0039b9cccc2e0..b59dc8d7dc1d5 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -228,10 +228,10 @@ impl From for ActiveModel { watermark_indices: Set(pb_table.watermark_indices.into()), dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()), dml_fragment_id, - cardinality: Set(pb_table.cardinality.map(|x| x.into())), + cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())), cleaned_by_watermark: Set(pb_table.cleaned_by_watermark), description: Set(pb_table.description), - version: Set(pb_table.version.map(|v| v.into())), + version: Set(pb_table.version.as_ref().map(|v| v.into())), retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), } diff --git a/src/meta/model_v2/src/user.rs b/src/meta/model_v2/src/user.rs index c92e219a9b619..f238683a83b02 100644 --- a/src/meta/model_v2/src/user.rs +++ b/src/meta/model_v2/src/user.rs @@ -61,7 +61,7 @@ impl From for ActiveModel { can_create_db: Set(user.can_create_db), can_create_user: Set(user.can_create_user), can_login: Set(user.can_login), - auth_info: Set(user.auth_info.map(AuthInfo)), + auth_info: Set(user.auth_info.as_ref().map(AuthInfo::from)), } } } @@ -75,7 +75,7 @@ impl From for PbUserInfo { can_create_db: val.can_create_db, can_create_user: val.can_create_user, can_login: val.can_login, - auth_info: val.auth_info.map(|x| x.into_inner()), + auth_info: val.auth_info.map(|x| x.to_protobuf()), grant_privileges: vec![], // fill in later } } diff --git a/src/meta/model_v2/src/view.rs b/src/meta/model_v2/src/view.rs index 8b0f52f022ab6..0e32cd2275150 100644 --- a/src/meta/model_v2/src/view.rs +++ b/src/meta/model_v2/src/view.rs @@ -56,7 +56,7 @@ impl From for ActiveModel { name: Set(view.name), properties: Set(Property(view.properties)), definition: Set(view.sql), - columns: Set(FieldArray(view.columns)), + columns: Set(view.columns.into()), } } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 95cf53b3e4fe1..18ca8cb5dad67 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -22,7 +22,6 @@ use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMA use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; -use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; @@ -31,7 +30,7 @@ use risingwave_meta_model_v2::{ sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SinkId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, + SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -763,7 +762,7 @@ impl CatalogController { ) .col_expr( fragment::Column::StreamNode, - StreamNode::from_protobuf(&pb_stream_node).into(), + StreamNode::from(&pb_stream_node).into(), ) .filter(fragment::Column::FragmentId.eq(fragment_id)) .exec(txn) @@ -1820,15 +1819,16 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?; let table = if let Some(col_idx) = comment.column_index { - let mut columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId) + let columns: ColumnCatalogArray = Table::find_by_id(comment.table_id as TableId) .select_only() .column(table::Column::Columns) .into_tuple() .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("table", comment.table_id))?; - let column = columns - .0 + let mut pb_columns = columns.to_protobuf(); + + let column = pb_columns .get_mut(col_idx as usize) .ok_or_else(|| MetaError::catalog_id_not_found("column", col_idx))?; let column_desc = column.column_desc.as_mut().ok_or_else(|| { @@ -1841,7 +1841,7 @@ impl CatalogController { column_desc.description = comment.description; table::ActiveModel { table_id: Set(comment.table_id as _), - columns: Set(columns), + columns: Set(pb_columns.into()), ..Default::default() } .update(&txn) @@ -1982,7 +1982,7 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; if let Some(source_info) = source_info - && source_info.into_inner().cdc_source_job + && source_info.to_protobuf().cdc_source_job { to_drop_streaming_jobs.push(object_id); } @@ -3171,8 +3171,8 @@ mod tests { .one(&mgr.inner.read().await.db) .await? .unwrap(); - assert_eq!(function.return_type.0, test_data_type); - assert_eq!(function.arg_types.into_inner().len(), 1); + assert_eq!(function.return_type.to_protobuf(), test_data_type); + assert_eq!(function.arg_types.to_protobuf().len(), 1); assert_eq!(function.language, "python"); mgr.drop_function(function.function_id).await?; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 2e9f6a480299f..01a38619e94f5 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -20,12 +20,11 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamingParallelism, TableId, VnodeBitmap, WorkerId, + StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -76,7 +75,7 @@ impl CatalogControllerInner { Ok(fragment_mappings.into_iter().map(|(fragment_id, mapping)| { FragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.into_inner()), + mapping: Some(mapping.to_protobuf()), } })) } @@ -230,7 +229,7 @@ impl CatalogController { expr_context: pb_expr_context, } = actor; - let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits); + let splits = pb_actor_splits.get(&actor_id).map(ConnectorSplits::from); let status = pb_actor_status.get(&actor_id).cloned().ok_or_else(|| { anyhow::anyhow!( "actor {} in fragment {} has no actor_status", @@ -266,8 +265,8 @@ impl CatalogController { parallel_unit_id, worker_id, upstream_actor_ids: upstream_actors.into(), - vnode_bitmap: pb_vnode_bitmap.map(VnodeBitmap), - expr_context: ExprContext(pb_expr_context), + vnode_bitmap: pb_vnode_bitmap.as_ref().map(VnodeBitmap::from), + expr_context: ExprContext::from(&pb_expr_context), }); actor_dispatchers.insert( actor_id as ActorId, @@ -280,9 +279,12 @@ impl CatalogController { let upstream_fragment_id = pb_upstream_fragment_ids.into(); - let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(); + let vnode_mapping = pb_vnode_mapping + .as_ref() + .map(FragmentVnodeMapping::from) + .unwrap(); - let stream_node = StreamNode::from_protobuf(&stream_node); + let stream_node = StreamNode::from(&stream_node); let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type) .unwrap() @@ -416,8 +418,8 @@ impl CatalogController { Some(nodes) }; - let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.into_inner()); - let pb_expr_context = Some(expr_context.into_inner()); + let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf()); + let pb_expr_context = Some(expr_context.to_protobuf()); let pb_upstream_actor_id = upstream_fragment_actors .values() @@ -444,7 +446,7 @@ impl CatalogController { ); if let Some(splits) = splits { - pb_actor_splits.insert(actor_id as _, splits.into_inner()); + pb_actor_splits.insert(actor_id as _, splits.to_protobuf()); } pb_actors.push(PbStreamActor { @@ -460,7 +462,7 @@ impl CatalogController { } let pb_upstream_fragment_ids = upstream_fragment_id.into_u32_array(); - let pb_vnode_mapping = vnode_mapping.into_inner(); + let pb_vnode_mapping = vnode_mapping.to_protobuf(); let pb_state_table_ids = state_table_ids.into_u32_array(); let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _; let pb_fragment = PbFragment { @@ -909,7 +911,7 @@ impl CatalogController { .await?; } - let mut fragment_mapping: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() + let fragment_mapping: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .select_only() .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) .join(JoinType::InnerJoin, fragment::Relation::Actor.def()) @@ -918,15 +920,17 @@ impl CatalogController { .all(&txn) .await?; // TODO: we'd better not store vnode mapping in fragment table and derive it from actors. - for (fragment_id, vnode_mapping) in &mut fragment_mapping { - vnode_mapping.0.data.iter_mut().for_each(|id| { + + for (fragment_id, vnode_mapping) in &fragment_mapping { + let mut pb_vnode_mapping = vnode_mapping.to_protobuf(); + pb_vnode_mapping.data.iter_mut().for_each(|id| { if let Some(new_id) = plan.get(&(*id as i32)) { *id = new_id.id; } }); fragment::ActiveModel { fragment_id: Set(*fragment_id), - vnode_mapping: Set(vnode_mapping.clone()), + vnode_mapping: Set(FragmentVnodeMapping::from(&pb_vnode_mapping)), ..Default::default() } .update(&txn) @@ -941,7 +945,7 @@ impl CatalogController { .into_iter() .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.into_inner()), + mapping: Some(mapping.to_protobuf()), }) .collect(), ) @@ -1054,13 +1058,13 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?; let mut actor_splits = actor_splits - .map(|splits| splits.0.splits) + .map(|splits| splits.to_protobuf().splits) .unwrap_or_default(); actor_splits.extend(splits.iter().map(Into::into)); Actor::update(actor::ActiveModel { actor_id: Set(*actor_id as _), - splits: Set(Some(ConnectorSplits(PbConnectorSplits { + splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits { splits: actor_splits, }))), ..Default::default() @@ -1330,10 +1334,11 @@ mod tests { use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; - use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; + use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, - ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, TableId, VnodeBitmap, + ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, + VnodeBitmap, }; use risingwave_pb::common::ParallelUnit; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; @@ -1545,9 +1550,9 @@ mod tests { let vnode_bitmap = actor_vnode_bitmaps .remove(¶llel_unit_id) - .map(|m| VnodeBitmap(m.to_protobuf())); + .map(|m| VnodeBitmap::from(&m.to_protobuf())); - let actor_splits = Some(ConnectorSplits(PbConnectorSplits { + let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits { splits: vec![PbConnectorSplit { split_type: "dummy".to_string(), ..Default::default() @@ -1566,7 +1571,7 @@ mod tests { worker_id: 0, upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), vnode_bitmap, - expr_context: ExprContext(PbExprContext { + expr_context: ExprContext::from(&PbExprContext { time_zone: String::from("America/New_York"), }), } @@ -1604,8 +1609,8 @@ mod tests { job_id: TEST_JOB_ID, fragment_type_mask: 0, distribution_type: DistributionType::Hash, - stream_node: StreamNode::from_protobuf(&stream_node), - vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()), + stream_node: StreamNode::from(&stream_node), + vnode_mapping: FragmentVnodeMapping::from(¶llel_unit_mapping.to_protobuf()), state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]), upstream_fragment_id: I32Array::default(), }; @@ -1690,10 +1695,8 @@ mod tests { assert_eq!(actor_dispatcher, pb_dispatcher); assert_eq!( - vnode_bitmap, - pb_vnode_bitmap - .as_ref() - .map(|bitmap| VnodeBitmap(bitmap.clone())) + vnode_bitmap.map(|bitmap| bitmap.to_protobuf()), + pb_vnode_bitmap, ); assert_eq!(mview_definition, ""); @@ -1714,13 +1717,10 @@ mod tests { assert_eq!( splits, - pb_actor_splits - .get(&pb_actor_id) - .cloned() - .map(ConnectorSplits) + pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from) ); - assert_eq!(Some(expr_context.into_inner()), pb_expr_context); + assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context); } } @@ -1742,8 +1742,8 @@ mod tests { PbFragmentDistributionType::from(fragment.distribution_type) as i32 ); assert_eq!( - pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(), - fragment.vnode_mapping + pb_vnode_mapping.unwrap(), + fragment.vnode_mapping.to_protobuf() ); assert_eq!( diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7de1582ceb655..4c334cdd70423 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -98,8 +98,8 @@ impl From> for PbTable { schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, - columns: value.0.columns.0, - pk: value.0.pk.0, + columns: value.0.columns.to_protobuf(), + pk: value.0.pk.to_protobuf(), dependent_relations: vec![], // todo: deprecate it. table_type: PbTableType::from(value.0.table_type) as _, distribution_key: value.0.distribution_key.0, @@ -118,7 +118,10 @@ impl From> for PbTable { watermark_indices: value.0.watermark_indices.0, dist_key_in_pk: value.0.dist_key_in_pk.0, dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32), - cardinality: value.0.cardinality.map(|cardinality| cardinality.0), + cardinality: value + .0 + .cardinality + .map(|cardinality| cardinality.to_protobuf()), initialized_at_epoch: Some( Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, ), @@ -128,7 +131,7 @@ impl From> for PbTable { cleaned_by_watermark: value.0.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. create_type: PbCreateType::Foreground as _, - version: value.0.version.map(|v| v.into_inner()), + version: value.0.version.map(|v| v.to_protobuf()), optional_associated_source_id: value .0 .optional_associated_source_id @@ -150,12 +153,12 @@ impl From> for PbSource { database_id: value.1.database_id.unwrap() as _, name: value.0.name, row_id_index: value.0.row_id_index.map(|id| id as _), - columns: value.0.columns.0, + columns: value.0.columns.to_protobuf(), pk_column_ids: value.0.pk_column_ids.0, with_properties: value.0.with_properties.0, owner: value.1.owner_id as _, - info: value.0.source_info.map(|info| info.0), - watermark_descs: value.0.watermark_descs.0, + info: value.0.source_info.map(|info| info.to_protobuf()), + watermark_descs: value.0.watermark_descs.to_protobuf(), definition: value.0.definition, connection_id: value.0.connection_id.map(|id| id as _), // todo: using the timestamp from the database directly. @@ -183,8 +186,8 @@ impl From> for PbSink { schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, - columns: value.0.columns.0, - plan_pk: value.0.plan_pk.0, + columns: value.0.columns.to_protobuf(), + plan_pk: value.0.plan_pk.to_protobuf(), dependent_relations: vec![], // todo: deprecate it. distribution_key: value.0.distribution_key.0, downstream_pk: value.0.downstream_pk.0, @@ -202,7 +205,7 @@ impl From> for PbSink { db_name: value.0.db_name, sink_from_name: value.0.sink_from_name, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. - format_desc: value.0.sink_format_desc.map(|desc| desc.0), + format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()), target_table: value.0.target_table.map(|id| id as _), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, @@ -217,7 +220,7 @@ impl From> for PbSubscription { schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, - plan_pk: value.0.plan_pk.0, + plan_pk: value.0.plan_pk.to_protobuf(), dependent_relations: vec![], // todo: deprecate it. distribution_key: value.0.distribution_key.0, owner: value.1.owner_id as _, @@ -230,7 +233,8 @@ impl From> for PbSubscription { Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, ), stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. - column_catalogs: value.0.columns.0, + column_catalogs: value.0.columns.to_protobuf(), + subscription_from_name: value.0.subscription_from_name, initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, } @@ -247,7 +251,7 @@ impl From> for PbIndex { owner: value.1.owner_id as _, index_table_id: value.0.index_table_id as _, primary_table_id: value.0.primary_table_id as _, - index_item: value.0.index_items.0, + index_item: value.0.index_items.to_protobuf(), index_columns_len: value.0.index_columns_len as _, initialized_at_epoch: Some( Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, @@ -273,7 +277,7 @@ impl From> for PbView { properties: value.0.properties.0, sql: value.0.definition, dependent_relations: vec![], // todo: deprecate it. - columns: value.0.columns.0, + columns: value.0.columns.to_protobuf(), } } } @@ -286,7 +290,9 @@ impl From> for PbConnection { database_id: value.1.database_id.unwrap() as _, name: value.0.name, owner: value.1.owner_id as _, - info: Some(PbConnectionInfo::PrivateLinkService(value.0.info.0)), + info: Some(PbConnectionInfo::PrivateLinkService( + value.0.info.to_protobuf(), + )), } } } @@ -305,8 +311,8 @@ impl From> for PbFunction { .split(',') .map(|s| s.to_string()) .collect(), - arg_types: value.0.arg_types.into_inner(), - return_type: Some(value.0.return_type.into_inner()), + arg_types: value.0.arg_types.to_protobuf(), + return_type: Some(value.0.return_type.to_protobuf()), language: value.0.language, link: value.0.link, identifier: value.0.identifier, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 61b38cf1bca53..100c2baf35abf 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -23,7 +23,6 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; -use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source, @@ -33,7 +32,7 @@ use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, subscription, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, - StreamingParallelism, TableId, TableVersion, UserId, + StreamNode, StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -481,7 +480,7 @@ impl CatalogController { for splits in split_assignment.values() { for (actor_id, splits) in splits { let splits = splits.iter().map(PbConnectorSplit::from).collect_vec(); - let connector_splits = PbConnectorSplits { splits }; + let connector_splits = &PbConnectorSplits { splits }; actor::ActiveModel { actor_id: Set(*actor_id as _), splits: Set(Some(connector_splits.into())), @@ -542,7 +541,7 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; let original_version = original_version.expect("version for table should exist"); - if version.version != original_version.inner_ref().version + 1 { + if version.version != original_version.to_protobuf().version + 1 { return Err(MetaError::permission_denied("table version is stale")); } @@ -734,7 +733,7 @@ impl CatalogController { } fragment::ActiveModel { fragment_id: Set(fragment_id), - stream_node: Set(StreamNode::from_protobuf(&stream_node)), + stream_node: Set(StreamNode::from(&stream_node)), upstream_fragment_id: Set(upstream_fragment_id), ..Default::default() } @@ -767,14 +766,14 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; - for (index_id, mut nodes) in index_items { - nodes - .0 + for (index_id, nodes) in index_items { + let mut pb_nodes = nodes.to_protobuf(); + pb_nodes .iter_mut() .for_each(|x| expr_rewriter.rewrite_expr(x)); let index = index::ActiveModel { index_id: Set(index_id), - index_items: Set(nodes), + index_items: Set(pb_nodes.into()), ..Default::default() } .update(&txn) @@ -839,7 +838,7 @@ impl CatalogController { if let Some(table_id) = source.optional_associated_table_id { vec![table_id] } else if let Some(source_info) = &source.source_info - && source_info.inner_ref().cdc_source_job + && source_info.to_protobuf().cdc_source_job { vec![source_id] } else { @@ -899,7 +898,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(StreamNode::from_protobuf(&stream_node)), + stream_node: Set(StreamNode::from(&stream_node)), ..Default::default() } .update(&txn) @@ -960,7 +959,7 @@ impl CatalogController { for (id, _, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), - stream_node: Set(StreamNode::from_protobuf(&stream_node)), + stream_node: Set(StreamNode::from(&stream_node)), ..Default::default() } .update(&txn) @@ -1107,12 +1106,12 @@ impl CatalogController { actor_id: Set(actor_id as _), fragment_id: Set(fragment_id as _), status: Set(ActorStatus::Running), - splits: Set(splits.map(|splits| PbConnectorSplits { splits }.into())), + splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())), parallel_unit_id: Set(parallel_unit.id as _), worker_id: Set(parallel_unit.worker_node_id as _), upstream_actor_ids: Set(actor_upstreams), - vnode_bitmap: Set(vnode_bitmap.map(|bitmap| bitmap.into())), - expr_context: Set(expr_context.unwrap().into()), + vnode_bitmap: Set(vnode_bitmap.as_ref().map(|bitmap| bitmap.into())), + expr_context: Set(expr_context.as_ref().unwrap().into()), }); for PbDispatcher { @@ -1132,7 +1131,7 @@ impl CatalogController { .into()), dist_key_indices: Set(dist_key_indices.into()), output_indices: Set(output_indices.into()), - hash_mapping: Set(hash_mapping.map(|mapping| mapping.into())), + hash_mapping: Set(hash_mapping.as_ref().map(|mapping| mapping.into())), dispatcher_id: Set(dispatcher_id as _), downstream_actor_ids: Set(downstream_actor_id.into()), }) @@ -1157,7 +1156,7 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?; let mut actor = actor.into_active_model(); - actor.vnode_bitmap = Set(Some(bitmap.to_protobuf().into())); + actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into())); actor.update(&txn).await?; } @@ -1174,7 +1173,7 @@ impl CatalogController { for actor in &fragment_actors { actor_to_parallel_unit.insert(actor.actor_id as u32, actor.parallel_unit_id as _); if let Some(vnode_bitmap) = &actor.vnode_bitmap { - let bitmap = Bitmap::from(vnode_bitmap.inner_ref()); + let bitmap = Bitmap::from(&vnode_bitmap.to_protobuf()); actor_to_vnode_bitmap.insert(actor.actor_id as u32, bitmap); } } @@ -1191,7 +1190,7 @@ impl CatalogController { .to_protobuf(); let mut fragment = fragment.into_active_model(); - fragment.vnode_mapping = Set(vnode_mapping.clone().into()); + fragment.vnode_mapping = Set((&vnode_mapping).into()); fragment.update(&txn).await?; fragment_mapping_to_notify.push(FragmentParallelUnitMapping { @@ -1236,7 +1235,7 @@ impl CatalogController { if dispatcher.dispatcher_type.as_ref() == &DispatcherType::Hash { dispatcher.hash_mapping = Set(upstream_dispatcher_mapping.as_ref().map(|m| { - risingwave_meta_model_v2::ActorMapping::from(m.to_protobuf()) + risingwave_meta_model_v2::ActorMapping::from(&m.to_protobuf()) })); } else { debug_assert!(upstream_dispatcher_mapping.is_none()); diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index d42fc79721807..09ae874d87aa9 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -119,7 +119,7 @@ impl CatalogController { PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db), PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user), PbUpdateField::AuthInfo => { - user.auth_info = Set(update_user.auth_info.clone().map(AuthInfo)) + user.auth_info = Set(update_user.auth_info.as_ref().map(AuthInfo::from)) } PbUpdateField::Rename => user.name = Set(update_user.name.clone()), }); diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index dad880bbad774..173634ab03cc7 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -18,14 +18,14 @@ use anyhow::anyhow; use itertools::Itertools; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode}; +use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, - SchemaId, SourceId, UserId, + SchemaId, SourceId, StreamNode, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; use risingwave_pb::meta::PbFragmentParallelUnitMapping; @@ -368,7 +368,10 @@ where .eq(pb_function.database_id as DatabaseId) .and(object::Column::SchemaId.eq(pb_function.schema_id as SchemaId)) .and(function::Column::Name.eq(&pb_function.name)) - .and(function::Column::ArgTypes.eq(DataTypeArray(pb_function.arg_types.clone()))), + .and( + function::Column::ArgTypes + .eq(DataTypeArray::from(pb_function.arg_types.clone())), + ), ) .count(db) .await?; @@ -802,7 +805,7 @@ where .into_iter() .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.into_inner()), + mapping: Some(mapping.to_protobuf()), }) .collect()) } @@ -831,7 +834,7 @@ where .into_iter() .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.into_inner()), + mapping: Some(mapping.to_protobuf()), }) .collect()) } diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index b7d136776a952..5cd1eb08402ec 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -51,7 +51,7 @@ impl Transactional for CompactionGroup { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_config::ActiveModel { compaction_group_id: Set(self.group_id as _), - config: Set(CompactionConfig((*self.compaction_config).to_owned())), + config: Set(CompactionConfig::from(&(*self.compaction_config))), }; compaction_config::Entity::insert(m) .on_conflict( @@ -77,8 +77,8 @@ impl Transactional for CompactStatus { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_status::ActiveModel { compaction_group_id: Set(self.compaction_group_id as _), - status: Set(LevelHandlers( - self.level_handlers.iter().map_into().collect(), + status: Set(LevelHandlers::from( + self.level_handlers.iter().map_into().collect_vec(), )), }; compaction_status::Entity::insert(m) @@ -107,7 +107,7 @@ impl Transactional for CompactTaskAssignment { let m = compaction_task::ActiveModel { id: Set(task.task_id as _), context_id: Set(self.context_id as _), - task: Set(CompactionTask(task)), + task: Set(CompactionTask::from(&task)), }; compaction_task::Entity::insert(m) .on_conflict( @@ -220,7 +220,7 @@ impl Transactional for HummockVersionDelta { max_committed_epoch: Set(self.max_committed_epoch as _), safe_epoch: Set(self.safe_epoch as _), trivial_move: Set(self.trivial_move), - full_version_delta: Set(FullVersionDelta(self.to_protobuf())), + full_version_delta: Set(FullVersionDelta::from(&self.to_protobuf())), }; hummock_version_delta::Entity::insert(m) .on_conflict( @@ -249,7 +249,7 @@ impl Transactional for HummockVersionDelta { impl From for CompactionGroup { fn from(value: compaction_config::Model) -> Self { - Self::new(value.compaction_group_id as _, value.config.0) + Self::new(value.compaction_group_id as _, value.config.to_protobuf()) } } @@ -257,7 +257,7 @@ impl From for CompactStatus { fn from(value: compaction_status::Model) -> Self { Self { compaction_group_id: value.compaction_group_id as _, - level_handlers: value.status.0.iter().map_into().collect(), + level_handlers: value.status.to_protobuf().iter().map_into().collect(), } } } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0ad4b78cdefd7..9602612334ad8 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -399,7 +399,7 @@ impl DdlController { // delete vpc endpoints. for conn in connections { let _ = self - .delete_vpc_endpoint_v2(conn.into_inner()) + .delete_vpc_endpoint_v2(conn.to_protobuf()) .await .inspect_err(|err| { tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index e48d1ae8bcb17..5eac8fec19159 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -607,7 +607,7 @@ impl SourceManager { ( actor_id as ActorId, splits - .into_inner() + .to_protobuf() .splits .iter() .map(|split| SplitImpl::try_from(split).unwrap())