diff --git a/proto/catalog.proto b/proto/catalog.proto index 455908bbc894b..67e71848e3d00 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -159,6 +159,26 @@ message Sink { optional string created_at_cluster_version = 23; } +message Subscription { + uint32 id = 1; + string name = 2; + string definition = 3; + repeated common.ColumnOrder plan_pk = 4; + repeated int32 distribution_key = 5; + map properties = 6; + repeated plan_common.ColumnCatalog column_catalogs = 7; + uint32 database_id = 8; + uint32 schema_id = 9; + repeated uint32 dependent_relations = 10; + optional uint64 initialized_at_epoch = 11; + optional uint64 created_at_epoch = 12; + uint32 owner = 13; + StreamJobStatus stream_job_status = 14; + + optional string initialized_at_cluster_version = 15; + optional string created_at_cluster_version = 16; +} + message Connection { message PrivateLinkService { enum PrivateLinkProvider { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6e66543627639..78f4f3c818e41 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -98,6 +98,26 @@ message DropSinkResponse { uint64 version = 2; } +message CreateSubscriptionRequest { + catalog.Subscription subscription = 1; + stream_plan.StreamFragmentGraph fragment_graph = 2; +} + +message CreateSubscriptionResponse { + common.Status status = 1; + uint64 version = 2; +} + +message DropSubscriptionRequest { + uint32 subscription_id = 1; + bool cascade = 2; +} + +message DropSubscriptionResponse { + common.Status status = 1; + uint64 version = 2; +} + message CreateMaterializedViewRequest { catalog.Table materialized_view = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; @@ -179,6 +199,7 @@ message AlterNameRequest { uint32 source_id = 5; uint32 schema_id = 6; uint32 database_id = 7; + uint32 subscription_id = 8; } string new_name = 20; } @@ -196,6 +217,7 @@ message AlterOwnerRequest { uint32 sink_id = 4; uint32 schema_id = 5; uint32 database_id = 6; + uint32 subscription_id = 7; } uint32 owner_id = 20; } @@ -208,6 +230,7 @@ message AlterSetSchemaRequest { uint32 sink_id = 4; uint32 function_id = 5; uint32 connection_id = 6; + uint32 subscription_id = 7; } uint32 new_schema_id = 20; } @@ -399,7 +422,9 @@ service DdlService { rpc CreateSource(CreateSourceRequest) returns (CreateSourceResponse); rpc DropSource(DropSourceRequest) returns (DropSourceResponse); rpc CreateSink(CreateSinkRequest) returns (CreateSinkResponse); + rpc CreateSubscription(CreateSubscriptionRequest) returns (CreateSubscriptionResponse); rpc DropSink(DropSinkRequest) returns (DropSinkResponse); + rpc DropSubscription(DropSubscriptionRequest) returns (DropSubscriptionResponse); rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse); rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse); rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); diff --git a/proto/meta.proto b/proto/meta.proto index 1db290af7b308..4ac73e3a0768d 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -373,6 +373,7 @@ message MetaSnapshot { repeated catalog.View views = 7; repeated catalog.Function functions = 15; repeated catalog.Connection connections = 17; + repeated catalog.Subscription subscriptions = 19; repeated user.UserInfo users = 8; // for streaming repeated FragmentParallelUnitMapping parallel_unit_mappings = 9; @@ -394,6 +395,7 @@ message Relation { catalog.Sink sink = 3; catalog.Index index = 4; catalog.View view = 5; + catalog.Subscription subscription = 6; } } diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index d5d0ad959714a..359a0de15979b 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -240,13 +240,6 @@ message SinkNode { SinkLogStoreType log_store_type = 3; } -message SubscriptionNode { - // log store should have a table. - catalog.Table log_store_table = 1; - //retention time with seconds - uint64 retention_seconds = 2; -} - message ProjectNode { repeated expr.ExprNode select_list = 1; // this two field is expressing a list of usize pair, which means when project receives a @@ -727,6 +720,12 @@ message OverWindowNode { OverWindowCachePolicy cache_policy = 5; } +message SubscriptionNode { + catalog.Subscription subscription_catalog = 1; + // log store should have a table. + catalog.Table log_store_table = 2; +} + message StreamNode { oneof node_body { SourceNode source = 100; @@ -865,6 +864,7 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_VALUES = 64; FRAGMENT_TYPE_FLAG_DML = 128; FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; + FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512; } // The streaming context associated with a stream plan diff --git a/proto/user.proto b/proto/user.proto index 69661d46f0db3..b132df55dcc13 100644 --- a/proto/user.proto +++ b/proto/user.proto @@ -65,6 +65,7 @@ message GrantPrivilege { uint32 all_tables_schema_id = 11; uint32 all_sources_schema_id = 12; uint32 all_dml_tables_schema_id = 13; + uint32 subscription_id = 14; } repeated ActionWithGrantOption action_with_opts = 7; } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index dba69fe89d9b5..f732942d0d77b 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -127,6 +127,8 @@ impl ObserverState for FrontendObserverNode { meta_backup_manifest_id: _, hummock_write_limits: _, version, + // todo!: add subscriptions + subscriptions: _, } = snapshot; for db in databases { @@ -288,6 +290,7 @@ impl FrontendObserverNode { Operation::Update => catalog_guard.update_view(view), _ => panic!("receive an unsupported notify {:?}", resp), }, + RelationInfo::Subscription(_) => todo!(), } } } diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index c021942bcbadd..36f18c6e1b6f8 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -4,6 +4,7 @@ pub use sea_orm_migration::prelude::*; mod m20230908_072257_init; mod m20231008_020431_hummock; +mod m20240304_074901_subscription; pub struct Migrator; @@ -13,6 +14,7 @@ impl MigratorTrait for Migrator { vec![ Box::new(m20230908_072257_init::Migration), Box::new(m20231008_020431_hummock::Migration), + Box::new(m20240304_074901_subscription::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs new file mode 100644 index 0000000000000..ece549c4f922d --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs @@ -0,0 +1,66 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +use crate::{assert_not_has_tables, drop_tables}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + assert_not_has_tables!(manager, Subscription); + manager + .create_table( + MigrationTable::create() + .table(Subscription::Table) + .col( + ColumnDef::new(Subscription::SubscriptionId) + .integer() + .primary_key(), + ) + .col(ColumnDef::new(Subscription::Name).string().not_null()) + .col( + ColumnDef::new(Subscription::Columns) + .json_binary() + .not_null(), + ) + .col( + ColumnDef::new(Subscription::PlanPk) + .json_binary() + .not_null(), + ) + .col( + ColumnDef::new(Subscription::DistributionKey) + .json_binary() + .not_null(), + ) + .col( + ColumnDef::new(Subscription::Properties) + .json_binary() + .not_null(), + ) + .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .to_owned(), + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // drop tables cascade. + drop_tables!(manager, Subscription); + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Subscription { + Table, + SubscriptionId, + Name, + Columns, + PlanPk, + DistributionKey, + Properties, + Definition, +} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 2abd66fae3fe3..c07d76827c7b6 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -44,6 +44,7 @@ pub mod schema; pub mod sink; pub mod source; pub mod streaming_job; +pub mod subscription; pub mod system_parameter; pub mod table; pub mod user; @@ -62,6 +63,7 @@ pub type SchemaId = ObjectId; pub type TableId = ObjectId; pub type SourceId = ObjectId; pub type SinkId = ObjectId; +pub type SubscriptionId = ObjectId; pub type IndexId = ObjectId; pub type ViewId = ObjectId; pub type FunctionId = ObjectId; diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model_v2/src/object.rs index 4d7a1e902cdc3..6df5db623ae3c 100644 --- a/src/meta/model_v2/src/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -37,6 +37,8 @@ pub enum ObjectType { Function, #[sea_orm(string_value = "CONNECTION")] Connection, + #[sea_orm(string_value = "SUBSCRIPTION")] + Subscription, } impl ObjectType { @@ -51,6 +53,7 @@ impl ObjectType { ObjectType::Index => "index", ObjectType::Function => "function", ObjectType::Connection => "connection", + ObjectType::Subscription => "subscription", } } } @@ -102,6 +105,8 @@ pub enum Relation { Schema, #[sea_orm(has_many = "super::sink::Entity")] Sink, + #[sea_orm(has_many = "super::subscription::Entity")] + Subscription, #[sea_orm(has_many = "super::source::Entity")] Source, #[sea_orm(has_many = "super::table::Entity")] @@ -164,6 +169,12 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscription.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::Source.def() diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index d2d4e1362f93e..a8a65cb3b840d 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -35,6 +35,7 @@ pub use super::schema::Entity as Schema; pub use super::sink::Entity as Sink; pub use super::source::Entity as Source; pub use super::streaming_job::Entity as StreamingJob; +pub use super::subscription::Entity as Subscription; pub use super::system_parameter::Entity as SystemParameter; pub use super::table::Entity as Table; pub use super::user::Entity as User; diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs new file mode 100644 index 0000000000000..ca0d83ffca699 --- /dev/null +++ b/src/meta/model_v2/src/subscription.rs @@ -0,0 +1,66 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::catalog::PbSubscription; +use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue::Set; + +use crate::{ColumnCatalogArray, ColumnOrderArray, I32Array, Property, SubscriptionId}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "subscription")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub subscription_id: SubscriptionId, + pub name: String, + pub columns: ColumnCatalogArray, + pub plan_pk: ColumnOrderArray, + pub distribution_key: I32Array, + pub properties: Property, + pub definition: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::object::Entity", + from = "Column::SubscriptionId", + to = "super::object::Column::Oid", + on_update = "NoAction", + on_delete = "Cascade" + )] + Object, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Object.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(pb_subscription: PbSubscription) -> Self { + Self { + subscription_id: Set(pb_subscription.id as _), + name: Set(pb_subscription.name), + columns: Set(pb_subscription.column_catalogs.into()), + plan_pk: Set(pb_subscription.plan_pk.into()), + distribution_key: Set(pb_subscription.distribution_key.into()), + properties: Set(pb_subscription.properties.into()), + definition: Set(pb_subscription.definition), + } + } +} diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 223ee2238032c..f5012e5796e69 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -311,6 +311,56 @@ impl DdlService for DdlServiceImpl { })) } + async fn create_subscription( + &self, + request: Request, + ) -> Result, Status> { + self.env.idle_manager().record_activity(); + + let req = request.into_inner(); + + let subscription = req.get_subscription()?.clone(); + let fragment_graph = req.get_fragment_graph()?.clone(); + + let stream_job = StreamingJob::Subscription(subscription); + + let command = DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + CreateType::Foreground, + None, + ); + + let version = self.ddl_controller.run_command(command).await?; + + Ok(Response::new(CreateSubscriptionResponse { + status: None, + version, + })) + } + + async fn drop_subscription( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let subscription_id = request.subscription_id; + let drop_mode = DropMode::from_request_setting(request.cascade); + + let command = DdlCommand::DropStreamingJob( + StreamingJobId::Subscription(subscription_id), + drop_mode, + None, + ); + + let version = self.ddl_controller.run_command(command).await?; + + Ok(Response::new(DropSubscriptionResponse { + status: None, + version, + })) + } + async fn create_materialized_view( &self, request: Request, diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index 06f61e5f26e50..e668ee1073547 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -73,6 +73,7 @@ impl NotificationServiceImpl { tables, sources, sinks, + subscriptions, indexes, views, functions, @@ -87,6 +88,7 @@ impl NotificationServiceImpl { tables, sources, sinks, + subscriptions, indexes, views, functions, @@ -105,6 +107,7 @@ impl NotificationServiceImpl { tables, sources, sinks, + subscriptions, indexes, views, functions, @@ -120,6 +123,7 @@ impl NotificationServiceImpl { tables, sources, sinks, + subscriptions, indexes, views, functions, @@ -220,7 +224,18 @@ impl NotificationServiceImpl { async fn frontend_subscribe(&self) -> MetaResult { let ( - (databases, schemas, tables, sources, sinks, indexes, views, functions, connections), + ( + databases, + schemas, + tables, + sources, + sinks, + subscriptions, + indexes, + views, + functions, + connections, + ), users, catalog_version, ) = self.get_catalog_snapshot().await?; @@ -239,6 +254,7 @@ impl NotificationServiceImpl { tables, indexes, views, + subscriptions, functions, connections, users, diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 4d00e07452c41..323f49b68663e 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -21,7 +21,7 @@ use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1}; use risingwave_backup::MetaSnapshotId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Table, View, + Connection, Database, Function, Index, Schema, Sink, Source, Subscription, Table, View, }; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::SystemParams; @@ -123,6 +123,7 @@ impl MetaSnapshotV1Builder { .await? .ok_or_else(|| anyhow!("cluster id not found in meta store"))? .into(); + let subscription = Subscription::list_at_snapshot::(&meta_store_snapshot).await?; self.snapshot.metadata = ClusterMetadata { default_cf, @@ -142,6 +143,7 @@ impl MetaSnapshotV1Builder { connection, system_param, cluster_id, + subscription, }; Ok(()) } diff --git a/src/meta/src/backup_restore/restore_impl/v1.rs b/src/meta/src/backup_restore/restore_impl/v1.rs index a3419610ba569..303e6e6ad5f52 100644 --- a/src/meta/src/backup_restore/restore_impl/v1.rs +++ b/src/meta/src/backup_restore/restore_impl/v1.rs @@ -198,5 +198,6 @@ async fn restore_metadata( restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?; restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?; restore_cluster_id(&meta_store, snapshot.metadata.cluster_id.into()).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.subscription).await?; Ok(()) } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1cd068d1fd571..607266691298c 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -27,15 +27,15 @@ use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, - sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, - ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, - IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo, - StreamingParallelism, TableId, UserId, + sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, + ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, + FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, + SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ - PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, - PbView, + PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, + PbSubscription, PbTable, PbView, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo; use risingwave_pb::meta::relation::PbRelationInfo; @@ -846,6 +846,18 @@ impl CatalogController { )), }); } + ObjectType::Subscription => { + let (subscription, obj) = Subscription::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("subscription", job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Subscription( + ObjectModel(subscription, obj.unwrap()).into(), + )), + }); + } ObjectType::Index => { let (index, obj) = Index::find_by_id(job_id) .find_also_related(Object) @@ -1361,6 +1373,44 @@ impl CatalogController { )); } } + ObjectType::Subscription => { + let subscription = Subscription::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?; + relations.push(PbRelationInfo::Subscription( + ObjectModel(subscription, obj).into(), + )); + + // internal tables. + let internal_tables: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter(table::Column::BelongsToJobId.eq(object_id)) + .into_tuple() + .all(&txn) + .await?; + + Object::update_many() + .col_expr( + object::Column::OwnerId, + SimpleExpr::Value(Value::Int(Some(new_owner))), + ) + .filter(object::Column::Oid.is_in(internal_tables.clone())) + .exec(&txn) + .await?; + + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::TableId.is_in(internal_tables)) + .all(&txn) + .await?; + for (table, table_obj) in table_objs { + relations.push(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )); + } + } ObjectType::View => { let view = View::find_by_id(object_id) .one(&txn) @@ -1577,6 +1627,48 @@ impl CatalogController { } } } + ObjectType::Subscription => { + let subscription = Subscription::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?; + check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn) + .await?; + relations.push(PbRelationInfo::Subscription( + ObjectModel(subscription, obj).into(), + )); + + // internal tables. + let internal_tables: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter(table::Column::BelongsToJobId.eq(object_id)) + .into_tuple() + .all(&txn) + .await?; + + if !internal_tables.is_empty() { + Object::update_many() + .col_expr( + object::Column::SchemaId, + SimpleExpr::Value(Value::Int(Some(new_schema))), + ) + .filter(object::Column::Oid.is_in(internal_tables.clone())) + .exec(&txn) + .await?; + + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::TableId.is_in(internal_tables)) + .all(&txn) + .await?; + for (table, table_obj) in table_objs { + relations.push(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )); + } + } + } ObjectType::View => { let view = View::find_by_id(object_id) .one(&txn) @@ -1755,7 +1847,11 @@ impl CatalogController { assert!( to_drop_objects.iter().all(|obj| matches!( obj.obj_type, - ObjectType::Table | ObjectType::Index | ObjectType::Sink | ObjectType::View + ObjectType::Table + | ObjectType::Index + | ObjectType::Sink + | ObjectType::View + | ObjectType::Subscription )), "only these objects will depends on others" ); @@ -1816,6 +1912,7 @@ impl CatalogController { .filter(|obj| { obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink + || obj.obj_type == ObjectType::Subscription || obj.obj_type == ObjectType::Index }) .map(|obj| obj.oid) @@ -1952,6 +2049,14 @@ impl CatalogController { ..Default::default() })), }), + ObjectType::Subscription => relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Subscription(PbSubscription { + id: obj.oid as _, + schema_id: obj.schema_id.unwrap() as _, + database_id: obj.database_id.unwrap() as _, + ..Default::default() + })), + }), ObjectType::View => relations.push(PbRelation { relation_info: Some(PbRelationInfo::View(PbView { id: obj.oid as _, @@ -2125,6 +2230,9 @@ impl CatalogController { ObjectType::Table => rename_relation!(Table, table, table_id, object_id), ObjectType::Source => rename_relation!(Source, source, source_id, object_id), ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), + ObjectType::Subscription => { + rename_relation!(Subscription, subscription, subscription_id, object_id) + } ObjectType::View => rename_relation!(View, view, view_id, object_id), ObjectType::Index => { let (mut index, obj) = Index::find_by_id(object_id) @@ -2203,6 +2311,9 @@ impl CatalogController { match obj.obj_type { ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), + ObjectType::Subscription => { + rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid) + } ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), ObjectType::Index => { let index_table_id: Option = Index::find_by_id(obj.oid) @@ -2213,7 +2324,9 @@ impl CatalogController { .await?; rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); } - _ => bail!("only table, sink, view and index depend on other objects."), + _ => { + bail!("only table, sink, subscription, view and index depend on other objects.") + } } } txn.commit().await?; @@ -2349,6 +2462,11 @@ impl CatalogController { inner.list_sinks().await } + pub async fn list_subscriptions(&self) -> MetaResult> { + let inner = self.inner.read().await; + inner.list_subscriptions().await + } + pub async fn list_views(&self) -> MetaResult> { let inner = self.inner.read().await; inner.list_views().await @@ -2415,10 +2533,25 @@ impl CatalogController { .into_tuple() .all(&inner.db) .await?; + let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> = + Subscription::find() + .select_only() + .columns([ + subscription::Column::SubscriptionId, + subscription::Column::Name, + ]) + .columns([object::Column::DatabaseId, object::Column::SchemaId]) + .join(JoinType::InnerJoin, subscription::Relation::Object.def()) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating)) + .into_tuple() + .all(&inner.db) + .await?; let mut job_mapping: HashMap = creating_tables .into_iter() .chain(creating_sinks.into_iter()) + .chain(creating_subscriptions.into_iter()) .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id)) .collect(); @@ -2557,6 +2690,7 @@ impl CatalogControllerInner { let tables = self.list_tables().await?; let sources = self.list_sources().await?; let sinks = self.list_sinks().await?; + let subscriptions = self.list_subscriptions().await?; let indexes = self.list_indexes().await?; let views = self.list_views().await?; let functions = self.list_functions().await?; @@ -2571,6 +2705,7 @@ impl CatalogControllerInner { tables, sources, sinks, + subscriptions, indexes, views, functions, @@ -2744,6 +2879,21 @@ impl CatalogControllerInner { .collect()) } + /// `list_subscriptions` return all `CREATED` subscriptions. + async fn list_subscriptions(&self) -> MetaResult> { + let subscription_objs = Subscription::find() + .find_also_related(Object) + .join(JoinType::LeftJoin, object::Relation::StreamingJob.def()) + .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) + .all(&self.db) + .await?; + + Ok(subscription_objs + .into_iter() + .map(|(subscription, obj)| ObjectModel(subscription, obj.unwrap()).into()) + .collect()) + } + async fn list_views(&self) -> MetaResult> { let view_objs = View::find().find_also_related(Object).all(&self.db).await?; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index d8f915dc713c2..7de1582ceb655 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -15,14 +15,14 @@ use anyhow::anyhow; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::{ - connection, database, function, index, object, schema, sink, source, table, view, + connection, database, function, index, object, schema, sink, source, subscription, table, view, }; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{ PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex, - PbSchema, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbTable, PbView, + PbSchema, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable, PbView, }; use sea_orm::{DatabaseConnection, ModelTrait}; @@ -210,6 +210,33 @@ impl From> for PbSink { } } +impl From> for PbSubscription { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.subscription_id as _, + schema_id: value.1.schema_id.unwrap() as _, + database_id: value.1.database_id.unwrap() as _, + name: value.0.name, + plan_pk: value.0.plan_pk.0, + dependent_relations: vec![], // todo: deprecate it. + distribution_key: value.0.distribution_key.0, + owner: value.1.owner_id as _, + properties: value.0.properties.0, + definition: value.0.definition, + initialized_at_epoch: Some( + Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + ), + created_at_epoch: Some( + Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + ), + stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. + column_catalogs: value.0.columns.0, + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, + } + } +} + impl From> for PbIndex { fn from(value: ObjectModel) -> Self { Self { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 7c4360a92f285..d929f76bb8f83 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -31,9 +31,9 @@ use risingwave_meta_model_v2::prelude::{ }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, - streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, - TableId, TableVersion, UserId, + streaming_job, subscription, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, + ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, + StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -172,6 +172,22 @@ impl CatalogController { let sink: sink::ActiveModel = sink.clone().into(); Sink::insert(sink).exec(&txn).await?; } + StreamingJob::Subscription(subscription) => { + let job_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Subscription, + subscription.owner as _, + Some(subscription.database_id as _), + Some(subscription.schema_id as _), + create_type, + ctx, + streaming_parallelism, + ) + .await?; + subscription.id = job_id as _; + let subscription: subscription::ActiveModel = subscription.clone().into(); + subscription.insert(&txn).await?; + } StreamingJob::Table(src, table, _) => { let job_id = Self::create_streaming_job_obj( &txn, diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 6c7e61a316add..04f3757f4cd3b 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -716,6 +716,7 @@ where ObjectType::Function => PbObject::FunctionId(oid), ObjectType::Index => unreachable!("index is not supported yet"), ObjectType::Connection => unreachable!("connection is not supported yet"), + ObjectType::Subscription => PbObject::SubscriptionId(oid), }; PbGrantPrivilege { action_with_opts: vec![PbActionWithGrantOption { diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index cd2f0bbb13b94..29a56b7146b0a 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -21,11 +21,14 @@ use risingwave_common::catalog::TableOption; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{ Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, - StreamJobStatus, Table, View, + StreamJobStatus, Subscription, Table, View, }; use risingwave_pb::data::DataType; -use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; +use super::{ + ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, SubscriptionId, + ViewId, +}; use crate::manager::{IndexId, MetaSrvEnv, TableId}; use crate::model::MetadataModel; use crate::{MetaError, MetaResult}; @@ -36,6 +39,7 @@ pub type Catalog = ( Vec, Vec, Vec, + Vec, Vec, Vec, Vec, @@ -58,6 +62,8 @@ pub struct DatabaseManager { pub(super) sources: BTreeMap, /// Cached sink information. pub(super) sinks: BTreeMap, + /// Cached subscription information. + pub(super) subscriptions: BTreeMap, /// Cached index information. pub(super) indexes: BTreeMap, /// Cached table information. @@ -92,6 +98,7 @@ impl DatabaseManager { let views = View::list(env.meta_store_checked()).await?; let functions = Function::list(env.meta_store_checked()).await?; let connections = Connection::list(env.meta_store_checked()).await?; + let subscriptions = Subscription::list(env.meta_store_checked()).await?; let mut relation_ref_count = HashMap::new(); @@ -114,6 +121,12 @@ impl DatabaseManager { } (sink.id, sink) })); + let subscriptions = BTreeMap::from_iter(subscriptions.into_iter().map(|subscription| { + for depend_relation_id in &subscription.dependent_relations { + *relation_ref_count.entry(*depend_relation_id).or_default() += 1; + } + (subscription.id, subscription) + })); let indexes = BTreeMap::from_iter(indexes.into_iter().map(|index| (index.id, index))); let tables = BTreeMap::from_iter(tables.into_iter().map(|table| { for depend_relation_id in &table.dependent_relations { @@ -135,6 +148,7 @@ impl DatabaseManager { schemas, sources, sinks, + subscriptions, views, tables, indexes, @@ -168,6 +182,14 @@ impl DatabaseManager { }) .cloned() .collect_vec(), + self.subscriptions + .values() + .filter(|t| { + t.stream_job_status == PbStreamJobStatus::Unspecified as i32 + || t.stream_job_status == PbStreamJobStatus::Created as i32 + }) + .cloned() + .collect_vec(), self.indexes .values() .filter(|t| { @@ -226,6 +248,15 @@ impl DatabaseManager { && x.name.eq(&relation_key.2) }) { Err(MetaError::catalog_duplicated("sink", &relation_key.2)) + } else if self.subscriptions.values().any(|x| { + x.database_id == relation_key.0 + && x.schema_id == relation_key.1 + && x.name.eq(&relation_key.2) + }) { + Err(MetaError::catalog_duplicated( + "subscription", + &relation_key.2, + )) } else if self.views.values().any(|x| { x.database_id == relation_key.0 && x.schema_id == relation_key.1 @@ -298,6 +329,10 @@ impl DatabaseManager { self.sinks.get(&sink_id) } + pub fn get_subscription(&self, subscription_id: SubscriptionId) -> Option<&Subscription> { + self.subscriptions.get(&subscription_id) + } + pub fn get_all_table_options(&self) -> HashMap { self.tables .iter() @@ -333,6 +368,10 @@ impl DatabaseManager { self.sinks.values().cloned().collect_vec() } + pub fn list_subscriptions(&self) -> Vec { + self.subscriptions.values().cloned().collect_vec() + } + pub fn list_views(&self) -> Vec { self.views.values().cloned().collect_vec() } @@ -358,6 +397,7 @@ impl DatabaseManager { .keys() .copied() .chain(self.sinks.keys().copied()) + .chain(self.subscriptions.keys().copied()) .chain(self.indexes.keys().copied()) .chain(self.sources.keys().copied()) .chain( @@ -396,6 +436,10 @@ impl DatabaseManager { self.tables.values().all(|t| t.schema_id != schema_id) && self.sources.values().all(|s| s.schema_id != schema_id) && self.sinks.values().all(|s| s.schema_id != schema_id) + && self + .subscriptions + .values() + .all(|s| s.schema_id != schema_id) && self.indexes.values().all(|i| i.schema_id != schema_id) && self.views.values().all(|v| v.schema_id != schema_id) } @@ -533,6 +577,17 @@ impl DatabaseManager { } } + pub fn ensure_subscription_id(&self, subscription_id: SubscriptionId) -> MetaResult<()> { + if self.subscriptions.contains_key(&subscription_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found( + "subscription", + subscription_id, + )) + } + } + pub fn ensure_index_id(&self, index_id: IndexId) -> MetaResult<()> { if self.indexes.contains_key(&index_id) { Ok(()) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 5edc761914ebf..e87c33602d454 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -36,7 +36,7 @@ use risingwave_connector::source::{should_copy_to_format_encode_options, UPSTREA use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, - Schema, Sink, Source, StreamJobStatus, Table, View, + Schema, Sink, Source, StreamJobStatus, Subscription, Table, View, }; use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -58,6 +58,7 @@ pub type SchemaId = u32; pub type TableId = u32; pub type SourceId = u32; pub type SinkId = u32; +pub type SubscriptionId = u32; pub type RelationId = u32; pub type IndexId = u32; pub type ViewId = u32; @@ -71,6 +72,7 @@ pub enum RelationIdEnum { Index(IndexId), View(ViewId), Sink(SinkId), + Subscription(SubscriptionId), Source(SourceId), } @@ -314,6 +316,7 @@ impl CatalogManager { let mut schemas = BTreeMapTransaction::new(&mut database_core.schemas); let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); let mut views = BTreeMapTransaction::new(&mut database_core.views); @@ -346,6 +349,7 @@ impl CatalogManager { let schemas_to_drop = drop_by_database_id!(schemas, database_id); let sources_to_drop = drop_by_database_id!(sources, database_id); let sinks_to_drop = drop_by_database_id!(sinks, database_id); + let subscriptions_to_drop = drop_by_database_id!(subscriptions, database_id); let tables_to_drop = drop_by_database_id!(tables, database_id); let indexes_to_drop = drop_by_database_id!(indexes, database_id); let views_to_drop = drop_by_database_id!(views, database_id); @@ -380,6 +384,7 @@ impl CatalogManager { schemas, sources, sinks, + subscriptions, tables, indexes, views, @@ -392,6 +397,11 @@ impl CatalogManager { .chain(schemas_to_drop.iter().map(|schema| schema.owner)) .chain(sources_to_drop.iter().map(|source| source.owner)) .chain(sinks_to_drop.iter().map(|sink| sink.owner)) + .chain( + subscriptions_to_drop + .iter() + .map(|subscription| subscription.owner), + ) .chain( tables_to_drop .iter() @@ -441,6 +451,11 @@ impl CatalogManager { .into_iter() .map(|sink| StreamingJobId::new(sink.id)), ) + .chain( + subscriptions_to_drop + .into_iter() + .map(|subscription| StreamingJobId::new(subscription.id)), + ) .collect_vec(); let source_deleted_ids = sources_to_drop .into_iter() @@ -696,6 +711,9 @@ impl CatalogManager { .await } StreamingJob::Sink(sink, _) => self.start_create_sink_procedure(sink).await, + StreamingJob::Subscription(subscription) => { + self.start_create_subscription_procedure(subscription).await + } StreamingJob::Index(index, index_table) => { self.start_create_index_procedure(index, index_table).await } @@ -1122,6 +1140,7 @@ impl CatalogManager { let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); let mut views = BTreeMapTransaction::new(&mut database_core.views); let mut users = BTreeMapTransaction::new(&mut user_core.user_info); @@ -1145,6 +1164,7 @@ impl CatalogManager { let mut all_internal_table_ids: HashSet = HashSet::default(); let mut all_index_ids: HashSet = HashSet::default(); let mut all_sink_ids: HashSet = HashSet::default(); + let mut all_subscription_ids: HashSet = HashSet::default(); let mut all_source_ids: HashSet = HashSet::default(); let mut all_view_ids: HashSet = HashSet::default(); let mut all_cdc_source_ids: HashSet = HashSet::default(); @@ -1174,6 +1194,18 @@ impl CatalogManager { }) .collect_vec(); + let subscriptions_depend_on = subscriptions + .tree_ref() + .iter() + .filter_map(|(_, subscription)| { + if subscription.dependent_relations.contains(&relation_id) { + Some(RelationInfo::Subscription(subscription.clone())) + } else { + None + } + }) + .collect_vec(); + let views_depend_on = views .tree_ref() .iter() @@ -1190,6 +1222,7 @@ impl CatalogManager { tables_depend_on .into_iter() .chain(sinks_depend_on) + .chain(subscriptions_depend_on) .chain(views_depend_on) .collect() }; @@ -1227,6 +1260,14 @@ impl CatalogManager { bail!("sink doesn't exist"); } } + RelationIdEnum::Subscription(subscription_id) => { + let subscription = subscriptions.get(&subscription_id).cloned(); + if let Some(subscription) = subscription { + deque.push_back(RelationInfo::Subscription(subscription)); + } else { + bail!("subscription doesn't exist"); + } + } RelationIdEnum::View(view_id) => { let view = views.get(&view_id).cloned(); if let Some(view) = view { @@ -1501,6 +1542,41 @@ impl CatalogManager { } } } + RelationInfo::Subscription(subscription) => { + if !all_subscription_ids.insert(subscription.id) { + continue; + } + let table_fragments = fragment_manager + .select_table_fragments_by_table_id(&subscription.id.into()) + .await?; + + all_internal_table_ids.extend(table_fragments.internal_table_ids()); + + if let Some(ref_count) = database_core + .relation_ref_count + .get(&subscription.id) + .cloned() + { + if ref_count > 0 { + // Other relations depend on it. + match drop_mode { + DropMode::Restrict => { + return Err(MetaError::permission_denied(format!( + "Fail to delete subscription `{}` because {} other relation(s) depend on it", + subscription.name, ref_count + ))); + } + DropMode::Cascade => { + for relation_info in + relations_depend_on(subscription.id as RelationId) + { + deque.push_back(relation_info); + } + } + } + } + } + } } } @@ -1528,6 +1604,10 @@ impl CatalogManager { .iter() .map(|sink_id| sinks.remove(*sink_id).unwrap()) .collect_vec(); + let subscriptions_removed = all_subscription_ids + .iter() + .map(|subscription_id| subscriptions.remove(*subscription_id).unwrap()) + .collect_vec(); if !matches!(relation, RelationIdEnum::Sink(_)) { let table_sinks = sinks_removed @@ -1586,7 +1666,16 @@ impl CatalogManager { ) }; - commit_meta!(self, tables, indexes, sources, views, sinks, users)?; + commit_meta!( + self, + tables, + indexes, + sources, + views, + sinks, + users, + subscriptions + )?; for index in &indexes_removed { user_core.decrease_ref(index.owner); @@ -1610,6 +1699,10 @@ impl CatalogManager { user_core.decrease_ref(sink.owner); } + for subscription in &subscriptions_removed { + user_core.decrease_ref(subscription.owner); + } + for user in users_need_update { self.notify_frontend(Operation::Update, Info::User(user)) .await; @@ -1638,6 +1731,12 @@ impl CatalogManager { } } + for subscription in &subscriptions_removed { + for dependent_relation_id in &subscription.dependent_relations { + database_core.decrease_ref_count(*dependent_relation_id); + } + } + let version = self .notify_frontend( Operation::Delete, @@ -1662,6 +1761,13 @@ impl CatalogManager { .chain(sinks_removed.into_iter().map(|sink| Relation { relation_info: RelationInfo::Sink(sink).into(), })) + .chain( + subscriptions_removed + .into_iter() + .map(|subscription| Relation { + relation_info: RelationInfo::Subscription(subscription).into(), + }), + ) .collect_vec(), }), ) @@ -1671,6 +1777,7 @@ impl CatalogManager { .into_iter() .map(|id| id.into()) .chain(all_sink_ids.into_iter().map(|id| id.into())) + .chain(all_subscription_ids.into_iter().map(|id| id.into())) .chain(all_cdc_source_ids.into_iter().map(|id| id.into())) .collect_vec(); @@ -1716,6 +1823,7 @@ impl CatalogManager { vec![table], vec![], vec![], + vec![], source, ) .await @@ -1732,6 +1840,7 @@ impl CatalogManager { mut to_update_tables: Vec
, mut to_update_views: Vec, mut to_update_sinks: Vec, + mut to_update_subscriptions: Vec, to_update_source: Option, ) -> MetaResult { for table in database_mgr.tables.values() { @@ -1760,10 +1869,21 @@ impl CatalogManager { } } + for subscription in database_mgr.subscriptions.values() { + if subscription.dependent_relations.contains(&relation_id) { + let mut subscription = subscription.clone(); + subscription.definition = + alter_relation_rename_refs(&subscription.definition, from, to); + to_update_subscriptions.push(subscription); + } + } + // commit meta. let mut tables = BTreeMapTransaction::new(&mut database_mgr.tables); let mut views = BTreeMapTransaction::new(&mut database_mgr.views); let mut sinks = BTreeMapTransaction::new(&mut database_mgr.sinks); + let mut subscriptions: BTreeMapTransaction<'_, u32, risingwave_pb::catalog::Subscription> = + BTreeMapTransaction::new(&mut database_mgr.subscriptions); let mut sources = BTreeMapTransaction::new(&mut database_mgr.sources); to_update_tables.iter().for_each(|table| { tables.insert(table.id, table.clone()); @@ -1774,6 +1894,9 @@ impl CatalogManager { to_update_sinks.iter().for_each(|sink| { sinks.insert(sink.id, sink.clone()); }); + to_update_subscriptions.iter().for_each(|subscription| { + subscriptions.insert(subscription.id, subscription.clone()); + }); if let Some(source) = &to_update_source { sources.insert(source.id, source.clone()); } @@ -1784,6 +1907,7 @@ impl CatalogManager { !to_update_tables.is_empty() || !to_update_views.is_empty() || !to_update_sinks.is_empty() + || !to_update_subscriptions.is_empty() || to_update_source.is_some() ); let version = self @@ -1801,6 +1925,13 @@ impl CatalogManager { .chain(to_update_sinks.into_iter().map(|sink| Relation { relation_info: RelationInfo::Sink(sink).into(), })) + .chain( + to_update_subscriptions + .into_iter() + .map(|subscription| Relation { + relation_info: RelationInfo::Subscription(subscription).into(), + }), + ) .chain(to_update_source.into_iter().map(|source| Relation { relation_info: RelationInfo::Source(source).into(), })) @@ -1843,6 +1974,7 @@ impl CatalogManager { vec![], vec![view], vec![], + vec![], None, ) .await @@ -1881,6 +2013,47 @@ impl CatalogManager { Ok(version) } + pub async fn alter_subscription_name( + &self, + subscription_id: SubscriptionId, + subscription_name: &str, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_subscription_id(subscription_id)?; + + // 1. validate new subscription name. + let mut subscription = database_core + .subscriptions + .get(&subscription_id) + .unwrap() + .clone(); + database_core.check_relation_name_duplicated(&( + subscription.database_id, + subscription.schema_id, + subscription_name.to_string(), + ))?; + + // 2. rename subscription and its definition. + subscription.name = subscription_name.to_string(); + subscription.definition = + alter_relation_rename(&subscription.definition, subscription_name); + + // 3. commit meta. + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + subscriptions.insert(subscription_id, subscription.clone()); + commit_meta!(self, subscriptions)?; + + let version = self + .notify_frontend_relation_info( + Operation::Update, + RelationInfo::Subscription(subscription), + ) + .await; + + Ok(version) + } + pub async fn alter_source_name( &self, source_id: SourceId, @@ -1912,6 +2085,7 @@ impl CatalogManager { vec![], vec![], vec![], + vec![], Some(source), ) .await @@ -2215,6 +2389,40 @@ impl CatalogManager { user_core.increase_ref(owner_id); user_core.decrease_ref(old_owner_id); } + alter_owner_request::Object::SubscriptionId(subscription_id) => { + database_core.ensure_subscription_id(subscription_id)?; + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + let mut subscription = subscriptions.get_mut(subscription_id).unwrap(); + let old_owner_id = subscription.owner; + if old_owner_id == owner_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + subscription.owner = owner_id; + + let mut relations = vec![Relation { + relation_info: Some(RelationInfo::Subscription(subscription.clone())), + }]; + + // internal tables + let internal_table_ids = fragment_manager + .select_table_fragments_by_table_id(&(subscription_id.into())) + .await? + .internal_table_ids(); + for id in internal_table_ids { + let mut table = tables.get_mut(id).unwrap(); + assert_eq!(old_owner_id, table.owner); + table.owner = owner_id; + relations.push(Relation { + relation_info: Some(RelationInfo::Table(table.clone())), + }); + } + + relation_info = Info::RelationGroup(RelationGroup { relations }); + commit_meta!(self, subscriptions, tables)?; + user_core.increase_ref(owner_id); + user_core.decrease_ref(old_owner_id); + } }; let version = self.notify_frontend(Operation::Update, relation_info).await; @@ -2439,6 +2647,42 @@ impl CatalogManager { let version = self.notify_frontend(Operation::Update, notify_info).await; return Ok(version); } + alter_set_schema_request::Object::SubscriptionId(subscription_id) => { + database_core.ensure_subscription_id(subscription_id)?; + let Subscription { + name, schema_id, .. + } = database_core.subscriptions.get(&subscription_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + // internal tables. + let to_update_internal_table_ids = Vec::from_iter( + fragment_manager + .select_table_fragments_by_table_id(&(subscription_id.into())) + .await? + .internal_table_ids(), + ); + + database_core.check_relation_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + let mut subscription = subscriptions.get_mut(subscription_id).unwrap(); + subscription.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Subscription(subscription.clone()))); + + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + for table_id in to_update_internal_table_ids { + let mut table = tables.get_mut(table_id).unwrap(); + table.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Table(table.clone()))); + } + + commit_meta!(self, subscriptions, tables)?; + } } let version = self @@ -2938,6 +3182,114 @@ impl CatalogManager { } } + pub async fn start_create_subscription_procedure( + &self, + subscription: &Subscription, + ) -> MetaResult<()> { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + database_core.ensure_database_id(subscription.database_id)?; + database_core.ensure_schema_id(subscription.schema_id)?; + for dependent_id in &subscription.dependent_relations { + database_core.ensure_table_view_or_source_id(dependent_id)?; + } + let key = ( + subscription.database_id, + subscription.schema_id, + subscription.name.clone(), + ); + database_core.check_relation_name_duplicated(&key)?; + #[cfg(not(test))] + user_core.ensure_user_id(subscription.owner)?; + + if database_core.has_in_progress_creation(&key) { + bail!("subscription already in creating procedure"); + } else { + database_core.mark_creating(&key); + database_core.mark_creating_streaming_job(subscription.id, key); + for &dependent_relation_id in &subscription.dependent_relations { + database_core.increase_ref_count(dependent_relation_id); + } + user_core.increase_ref(subscription.owner); + Ok(()) + } + } + + pub async fn finish_create_subscription_procedure( + &self, + mut internal_tables: Vec
, + mut subscription: Subscription, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let key = ( + subscription.database_id, + subscription.schema_id, + subscription.name.clone(), + ); + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + let mut subscriptions = BTreeMapTransaction::new(&mut database_core.subscriptions); + assert!( + !subscriptions.contains_key(&subscription.id) + && database_core.in_progress_creation_tracker.contains(&key), + "subscription must be in creating procedure" + ); + + database_core.in_progress_creation_tracker.remove(&key); + database_core + .in_progress_creation_streaming_job + .remove(&subscription.id); + + subscription.stream_job_status = PbStreamJobStatus::Created.into(); + subscriptions.insert(subscription.id, subscription.clone()); + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); + tables.insert(table.id, table.clone()); + } + commit_meta!(self, subscriptions, tables)?; + + let version = self + .notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Subscription(subscription.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) + .await; + + Ok(version) + } + + pub async fn cancel_create_subscription_procedure(&self, subscription: &Subscription) { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + let key = ( + subscription.database_id, + subscription.schema_id, + subscription.name.clone(), + ); + assert!( + !database_core.subscriptions.contains_key(&subscription.id), + "subscription must be in creating procedure" + ); + + database_core.unmark_creating(&key); + database_core.unmark_creating_streaming_job(subscription.id); + for &dependent_relation_id in &subscription.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); + } + user_core.decrease_ref(subscription.owner); + } + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. pub async fn start_replace_table_procedure(&self, stream_job: &StreamingJob) -> MetaResult<()> { let StreamingJob::Table(source, table, ..) = stream_job else { @@ -3237,6 +3589,10 @@ impl CatalogManager { self.core.lock().await.database.list_sinks() } + pub async fn list_subscriptions(&self) -> Vec { + self.core.lock().await.database.list_subscriptions() + } + pub async fn list_views(&self) -> Vec { self.core.lock().await.database.list_views() } @@ -3376,6 +3732,11 @@ impl CatalogManager { self.core.lock().await.database.sinks.len() } + #[cfg_attr(coverage, coverage(off))] + pub async fn subscription_count(&self) -> usize { + self.core.lock().await.database.subscriptions.len() + } + #[cfg_attr(coverage, coverage(off))] pub async fn function_count(&self) -> usize { self.core.lock().await.database.functions.len() diff --git a/src/meta/src/manager/catalog/user.rs b/src/meta/src/manager/catalog/user.rs index 456dd116b9daf..d148e4c591819 100644 --- a/src/meta/src/manager/catalog/user.rs +++ b/src/meta/src/manager/catalog/user.rs @@ -53,6 +53,12 @@ impl UserManager { .chain(database.sources.values().map(|source| source.owner)) .chain(database.sinks.values().map(|sink| sink.owner)) .chain(database.indexes.values().map(|index| index.owner)) + .chain( + database + .subscriptions + .values() + .map(|subscriptions| subscriptions.owner), + ) .chain( database .tables diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 0fcdb2008a103..2f7a68ed081f3 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -15,7 +15,7 @@ use risingwave_common::catalog::TableVersionId; use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; -use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; +use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Subscription, Table}; use risingwave_pb::ddl_service::TableJobType; use strum::EnumDiscriminants; @@ -26,6 +26,7 @@ use crate::model::FragmentId; #[derive(Debug, Clone, EnumDiscriminants)] pub enum StreamingJob { MaterializedView(Table), + Subscription(Subscription), Sink(Sink, Option<(Table, Option)>), Table(Option, Table, TableJobType), Index(Index, Table), @@ -36,6 +37,7 @@ pub enum StreamingJob { pub enum DdlType { MaterializedView, Sink, + Subscription, Table(TableJobType), Index, Source, @@ -49,6 +51,7 @@ impl From<&StreamingJob> for DdlType { StreamingJob::Table(_, _, ty) => DdlType::Table(*ty), StreamingJob::Index(_, _) => DdlType::Index, StreamingJob::Source(_) => DdlType::Source, + StreamingJob::Subscription(_) => DdlType::Subscription, } } } @@ -89,6 +92,10 @@ impl StreamingJob { source.created_at_epoch = created_at_epoch; source.created_at_cluster_version = created_at_cluster_version; } + StreamingJob::Subscription(subscription) => { + subscription.created_at_epoch = created_at_epoch; + subscription.created_at_cluster_version = created_at_cluster_version; + } } } @@ -121,6 +128,10 @@ impl StreamingJob { source.initialized_at_epoch = initialized_at_epoch; source.initialized_at_cluster_version = initialized_at_cluster_version; } + StreamingJob::Subscription(subscription) => { + subscription.initialized_at_epoch = initialized_at_epoch; + subscription.initialized_at_cluster_version = initialized_at_cluster_version; + } } } } @@ -139,6 +150,9 @@ impl StreamingJob { StreamingJob::Source(src) => { src.id = id; } + StreamingJob::Subscription(subscription) => { + subscription.id = id; + } } } @@ -148,7 +162,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { table.fragment_id = id; } - Self::Sink(_, _) | Self::Source(_) => {} + Self::Sink(_, _) | Self::Source(_) | Self::Subscription(_) => {} } } @@ -158,7 +172,10 @@ impl StreamingJob { Self::Table(_, table, ..) => { table.dml_fragment_id = id; } - Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {} + Self::MaterializedView(_) + | Self::Index(_, _) + | Self::Sink(_, _) + | Self::Subscription(_) => {} Self::Source(_) => {} } } @@ -170,6 +187,7 @@ impl StreamingJob { Self::Table(_, table, ..) => table.id, Self::Index(index, _) => index.id, Self::Source(source) => source.id, + Self::Subscription(subscription) => subscription.id, } } @@ -180,6 +198,7 @@ impl StreamingJob { Self::Table(_, table, ..) => Some(table.id), Self::Index(_, table) => Some(table.id), Self::Source(_) => None, + Self::Subscription(_) => None, } } @@ -189,7 +208,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { Some(table) } - Self::Sink(_, _) | Self::Source(_) => None, + Self::Sink(_, _) | Self::Source(_) | Self::Subscription(_) => None, } } @@ -200,6 +219,7 @@ impl StreamingJob { Self::Table(_, table, ..) => table.schema_id, Self::Index(index, _) => index.schema_id, Self::Source(source) => source.schema_id, + Self::Subscription(subscription) => subscription.schema_id, } } @@ -210,6 +230,7 @@ impl StreamingJob { Self::Table(_, table, ..) => table.database_id, Self::Index(index, _) => index.database_id, Self::Source(source) => source.database_id, + Self::Subscription(subscription) => subscription.database_id, } } @@ -220,6 +241,7 @@ impl StreamingJob { Self::Table(_, table, ..) => table.name.clone(), Self::Index(index, _) => index.name.clone(), Self::Source(source) => source.name.clone(), + Self::Subscription(subscription) => subscription.name.clone(), } } @@ -230,6 +252,7 @@ impl StreamingJob { StreamingJob::Table(_, table, ..) => table.owner, StreamingJob::Index(index, _) => index.owner, StreamingJob::Source(source) => source.owner, + StreamingJob::Subscription(subscription) => subscription.owner, } } @@ -240,6 +263,7 @@ impl StreamingJob { Self::Index(_, table) => table.definition.clone(), Self::Sink(sink, _) => sink.definition.clone(), Self::Source(source) => source.definition.clone(), + Self::Subscription(subscription) => subscription.definition.clone(), } } @@ -277,6 +301,7 @@ impl StreamingJob { vec![] } StreamingJob::Source(_) => vec![], + Self::Subscription(subscription) => subscription.dependent_relations.clone(), } } diff --git a/src/meta/src/model/catalog.rs b/src/meta/src/model/catalog.rs index 146bbc2690afc..76477aa66ab35 100644 --- a/src/meta/src/model/catalog.rs +++ b/src/meta/src/model/catalog.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Table, View, + Connection, Database, Function, Index, Schema, Sink, Source, Subscription, Table, View, }; use crate::model::{MetadataModel, MetadataModelResult}; @@ -36,6 +36,8 @@ const CATALOG_TABLE_CF_NAME: &str = "cf/catalog_table"; const CATALOG_SCHEMA_CF_NAME: &str = "cf/catalog_schema"; /// Column family name for database catalog. const CATALOG_DATABASE_CF_NAME: &str = "cf/catalog_database"; +/// Column family name for database catalog. +const CATALOG_SUBSCRIPTION_CF_NAME: &str = "cf/catalog_subscription"; macro_rules! impl_model_for_catalog { ($name:ident, $cf:ident, $key_ty:ty, $key_fn:ident) => { @@ -71,6 +73,7 @@ impl_model_for_catalog!(Function, CATALOG_FUNCTION_CF_NAME, u32, get_id); impl_model_for_catalog!(Table, CATALOG_TABLE_CF_NAME, u32, get_id); impl_model_for_catalog!(Schema, CATALOG_SCHEMA_CF_NAME, u32, get_id); impl_model_for_catalog!(Database, CATALOG_DATABASE_CF_NAME, u32, get_id); +impl_model_for_catalog!(Subscription, CATALOG_SUBSCRIPTION_CF_NAME, u32, get_id); #[cfg(test)] mod tests { diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 66dff64dc186f..8b53d87160efa 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -172,6 +172,7 @@ macro_rules! for_all_metadata_models { { risingwave_pb::catalog::Table }, { risingwave_pb::catalog::Index }, { risingwave_pb::catalog::Sink }, + { risingwave_pb::catalog::Subscription }, { risingwave_pb::catalog::Source }, { risingwave_pb::catalog::View }, { crate::model::stream::TableFragments }, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 908c62ebdffe6..7d16d353643fe 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -69,7 +69,7 @@ use crate::manager::{ CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1, NotificationVersion, RelationIdEnum, SchemaId, SinkId, SourceId, StreamingClusterInfo, - StreamingJob, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, + StreamingJob, SubscriptionId, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -101,6 +101,7 @@ pub enum StreamingJobId { Sink(SinkId), Table(Option, TableId), Index(IndexId), + Subscription(SubscriptionId), } impl StreamingJobId { @@ -109,6 +110,7 @@ impl StreamingJobId { match self { StreamingJobId::MaterializedView(id) | StreamingJobId::Sink(id) + | StreamingJobId::Subscription(id) | StreamingJobId::Table(_, id) | StreamingJobId::Index(id) => *id, } @@ -1136,6 +1138,7 @@ impl DdlController { StreamingJobId::Sink(id) => (id as _, ObjectType::Sink), StreamingJobId::Table(_, id) => (id as _, ObjectType::Table), StreamingJobId::Index(idx) => (idx as _, ObjectType::Index), + StreamingJobId::Subscription(id) => (id as _, ObjectType::Sink), }; let version = self @@ -1193,6 +1196,15 @@ impl DdlController { ) .await? } + StreamingJobId::Subscription(subscription_id) => { + mgr.catalog_manager + .drop_relation( + RelationIdEnum::Subscription(subscription_id), + mgr.fragment_manager.clone(), + drop_mode, + ) + .await? + } }; if let Some(replace_table_info) = target_replace_info { @@ -1510,6 +1522,11 @@ impl DdlController { .cancel_create_sink_procedure(sink, target_table) .await; } + StreamingJob::Subscription(subscription) => { + mgr.catalog_manager + .cancel_create_subscription_procedure(subscription) + .await; + } StreamingJob::Table(source, table, ..) => { if let Some(source) = source { mgr.catalog_manager @@ -1596,6 +1613,11 @@ impl DdlController { version } + StreamingJob::Subscription(subscription) => { + mgr.catalog_manager + .finish_create_subscription_procedure(internal_tables, subscription) + .await? + } StreamingJob::Table(source, table, ..) => { creating_internal_table_ids.push(table.id); if let Some(source) = source { @@ -1928,6 +1950,11 @@ impl DdlController { .alter_database_name(database_id, new_name) .await } + alter_name_request::Object::SubscriptionId(sink_id) => { + mgr.catalog_manager + .alter_subscription_name(sink_id, new_name) + .await + } }, MetadataManager::V2(mgr) => { let (obj_type, id) = match relation { @@ -1944,6 +1971,9 @@ impl DdlController { alter_name_request::Object::DatabaseId(id) => { (ObjectType::Database, id as ObjectId) } + alter_name_request::Object::SubscriptionId(id) => { + (ObjectType::Subscription, id as ObjectId) + } }; mgr.catalog_controller .alter_name(obj_type, id, new_name) @@ -1971,6 +2001,7 @@ impl DdlController { Object::SinkId(id) => (ObjectType::Sink, id as ObjectId), Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId), Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId), + Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId), }; mgr.catalog_controller .alter_owner(obj_type, id, owner_id as _) @@ -2010,6 +2041,9 @@ impl DdlController { alter_set_schema_request::Object::ConnectionId(id) => { (ObjectType::Connection, id as ObjectId) } + alter_set_schema_request::Object::SubscriptionId(id) => { + (ObjectType::Subscription, id as ObjectId) + } }; mgr.catalog_controller .alter_schema(obj_type, id, new_schema_id as _) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index fdf43532ca629..fbe250818e6e4 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -147,6 +147,11 @@ impl BuildingFragment { has_table = true; } + NodeBody::Subscription(subscription_node) => { + subscription_node.subscription_catalog.as_mut().unwrap().id = table_id; + + has_table = true; + } NodeBody::Dml(dml_node) => { dml_node.table_id = table_id; dml_node.table_version_id = job.table_version_id().unwrap(); @@ -654,7 +659,10 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { + DdlType::MaterializedView + | DdlType::Sink + | DdlType::Index + | DdlType::Subscription => { // handle MV on MV // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` diff --git a/src/storage/backup/src/meta_snapshot_v1.rs b/src/storage/backup/src/meta_snapshot_v1.rs index 4cc67d87f6561..f6c457572aaa8 100644 --- a/src/storage/backup/src/meta_snapshot_v1.rs +++ b/src/storage/backup/src/meta_snapshot_v1.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Table, View, + Connection, Database, Function, Index, Schema, Sink, Source, Subscription, Table, View, }; use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats}; use risingwave_pb::meta::{SystemParams, TableFragments}; @@ -120,6 +120,7 @@ pub struct ClusterMetadata { pub connection: Vec, pub system_param: SystemParams, pub cluster_id: String, + pub subscription: Vec, } impl ClusterMetadata { @@ -144,6 +145,7 @@ impl ClusterMetadata { Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf); Self::encode_prost_message(&self.system_param, buf); Self::encode_prost_message(&self.cluster_id, buf); + Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf); Ok(()) } @@ -171,6 +173,8 @@ impl ClusterMetadata { let connection: Vec = Self::decode_prost_message_list(&mut buf)?; let system_param: SystemParams = Self::decode_prost_message(&mut buf)?; let cluster_id: String = Self::decode_prost_message(&mut buf)?; + let subscription: Vec = + Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?; Ok(Self { default_cf, @@ -190,6 +194,7 @@ impl ClusterMetadata { connection, system_param, cluster_id, + subscription, }) } @@ -228,6 +233,16 @@ impl ClusterMetadata { } Ok(result) } + + fn try_decode_prost_message_list(buf: &mut &[u8]) -> Option>> + where + T: prost::Message + Default, + { + if buf.is_empty() { + return None; + } + Some(Self::decode_prost_message_list(buf)) + } } #[cfg(test)] diff --git a/src/stream/src/executor/subscription.rs b/src/stream/src/executor/subscription.rs index dfe352909fd8e..1dd47f89c43d9 100644 --- a/src/stream/src/executor/subscription.rs +++ b/src/stream/src/executor/subscription.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str::FromStr; use core::time::Duration; +use std::collections::HashMap; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; -use risingwave_common::types::Timestamptz; +use risingwave_common::types::{Interval, Timestamptz}; use risingwave_common::util::epoch::Epoch; use risingwave_storage::store::LocalStateStore; use tokio::time::Instant; @@ -34,7 +36,7 @@ pub struct SubscriptionExecutor { actor_context: ActorContextRef, input: Executor, log_store: SubscriptionLogStoreWriter, - retention_seconds: u64, + retention_seconds: i64, } impl SubscriptionExecutor { @@ -44,8 +46,20 @@ impl SubscriptionExecutor { actor_context: ActorContextRef, input: Executor, log_store: SubscriptionLogStoreWriter, - retention_seconds: u64, + properties: HashMap, ) -> StreamExecutorResult { + let retention_seconds_str = properties.get("retention").ok_or_else(|| { + StreamExecutorError::serde_error("Subscription retention time not set.".to_string()) + })?; + let retention_seconds = (Interval::from_str(retention_seconds_str) + .map_err(|_| { + StreamExecutorError::serde_error( + "Retention needs to be set in Interval format".to_string(), + ) + })? + .epoch_in_micros() + / 1000000) as i64; + Ok(Self { actor_context, input, @@ -83,7 +97,7 @@ impl SubscriptionExecutor { let truncate_offset: Option = if next_truncate_time < Instant::now() { - let truncate_timestamptz = Timestamptz::from_secs(barrier.get_curr_epoch().as_timestamptz().timestamp() - (self.retention_seconds as i64)).ok_or_else(||{StreamExecutorError::from("Subscription retention time calculation error: timestamp is out of range.".to_string())})?; + let truncate_timestamptz = Timestamptz::from_secs(barrier.get_curr_epoch().as_timestamptz().timestamp() - self.retention_seconds).ok_or_else(||{StreamExecutorError::from("Subscription retention time calculation error: timestamp is out of range.".to_string())})?; let epoch = Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64); next_truncate_time = diff --git a/src/stream/src/from_proto/subscription.rs b/src/stream/src/from_proto/subscription.rs index ff0c1f8c1d084..abc30a1967435 100644 --- a/src/stream/src/from_proto/subscription.rs +++ b/src/stream/src/from_proto/subscription.rs @@ -58,14 +58,22 @@ impl ExecutorBuilder for SubscriptionExecutorBuilder { Some(vnodes.clone()), &KV_LOG_STORE_V2_INFO, ); - let log_store_identity = format!("subscription-executor[{}]", params.executor_id); + let log_store_identity = format!( + "subscription[{}]-executor[{}]", + node.subscription_catalog.as_ref().unwrap().id, + params.executor_id + ); let log_store = SubscriptionLogStoreWriter::new(table_id, local_state_store, serde, log_store_identity); let exec = SubscriptionExecutor::new( params.actor_context, input, log_store, - node.retention_seconds, + node.subscription_catalog + .as_ref() + .unwrap() + .properties + .clone(), ) .await?; Ok((params.info, exec).into())