diff --git a/e2e_test/ddl/alter_owner.slt b/e2e_test/ddl/alter_owner.slt index e5a6773956201..63a4f6bb2f4d2 100644 --- a/e2e_test/ddl/alter_owner.slt +++ b/e2e_test/ddl/alter_owner.slt @@ -136,6 +136,29 @@ WHERE ---- sink user1 +statement ok +CREATE SUBSCRIPTION subscription FROM mv WITH ( + retention = '1D' +); + +statement ok +ALTER SUBSCRIPTION subscription OWNER TO user1; + +query TT +SELECT + pg_class.relname AS rel_name, + pg_roles.rolname AS owner +FROM + pg_class + JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace + JOIN pg_roles ON pg_roles.oid = pg_class.relowner +WHERE + pg_namespace.nspname NOT LIKE 'pg_%' + AND pg_namespace.nspname != 'information_schema' + AND pg_class.relname = 'subscription'; +---- +sink user1 + statement ok CREATE DATABASE d; @@ -181,6 +204,9 @@ DROP DATABASE d; statement ok DROP SINK sink; +statement ok +DROP SUBSCRIPTION subscription; + statement ok DROP SOURCE src; diff --git a/e2e_test/ddl/alter_parallelism.slt b/e2e_test/ddl/alter_parallelism.slt index 025496ca1c571..08864e561845f 100644 --- a/e2e_test/ddl/alter_parallelism.slt +++ b/e2e_test/ddl/alter_parallelism.slt @@ -13,6 +13,9 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali statement ok create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id; +statement ok +create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id; + statement ok create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id; @@ -94,9 +97,28 @@ select parallelism from sink_parallelism where name = 's'; ---- FIXED(4) +statement ok +create subscription subscription1 from t with (retention = '1D'); + +query T +select parallelism from subscription_parallelism where name = 'subscription1'; +---- +ADAPTIVE + +statement ok +alter subscription subscription1 set parallelism = 4; + +query T +select parallelism from subscription_parallelism where name = 'subscription1'; +---- +FIXED(4) + statement ok drop sink s; +statement ok +drop subscription subscription1; + statement ok drop materialized view m_join; diff --git a/e2e_test/ddl/alter_rename.slt b/e2e_test/ddl/alter_rename.slt index 5171f7f3cdad1..03655ef99d20b 100644 --- a/e2e_test/ddl/alter_rename.slt +++ b/e2e_test/ddl/alter_rename.slt @@ -15,6 +15,11 @@ CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH ( connector = 'blackhole' ); +statement ok +CREATE SUBSCRIPTION subscription FROM mv WITH ( + retention = '1D' +); + statement ok CREATE SOURCE src (v INT) WITH ( connector = 'datagen', @@ -113,6 +118,14 @@ SHOW CREATE SINK sink1; ---- public.sink1 CREATE SINK sink1 AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv2 AS mv3 WITH (connector = 'blackhole') +statement ok +ALTER SUBSCRIPTION subscription RENAME TO subscription1; + +query TT +SHOW CREATE SUBSCRIPTION subscription1; +---- +public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D') + # alter mview rename with alias conflict, used by sink1 statement ok ALTER MATERIALIZED VIEW mv2 RENAME TO mv3; @@ -229,6 +242,9 @@ DROP SCHEMA schema1; statement ok DROP SINK sink1; +statement ok +DROP SUBSCRIPTION subscription1; + statement error Permission denied DROP VIEW v5; diff --git a/e2e_test/ddl/alter_set_schema.slt b/e2e_test/ddl/alter_set_schema.slt index 8206449a8b727..a4348076ab902 100644 --- a/e2e_test/ddl/alter_set_schema.slt +++ b/e2e_test/ddl/alter_set_schema.slt @@ -77,6 +77,22 @@ WHERE nspname = 'test_schema'; ---- test_sink test_schema +statement ok +CREATE SUBSCRIPTION test_subscription FROM test_schema.test_table WITH ( + retention = '1D' +); + +statement ok +ALTER SUBSCRIPTION test_subscription SET SCHEMA test_schema; + +query TT +SELECT name AS subscriptionname, nspname AS schemaname +FROM rw_subscriptions +JOIN pg_namespace ON pg_namespace.oid = rw_subscriptions.schema_id +WHERE nspname = 'test_schema'; +---- +test_subscription test_schema + statement ok CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock'); @@ -97,6 +113,9 @@ DROP CONNECTION test_schema.test_conn; statement ok DROP SINK test_schema.test_sink; +statement ok +DROP SUBSCRIPTION test_subscription; + statement ok DROP SOURCE test_schema.test_source; diff --git a/e2e_test/ddl/subscription.slt b/e2e_test/ddl/subscription.slt new file mode 100644 index 0000000000000..8afe2d69cffe0 --- /dev/null +++ b/e2e_test/ddl/subscription.slt @@ -0,0 +1,41 @@ +statement ok +create table ddl_t (v1 int); + +statement ok +create materialized view ddl_mv as select v1 from ddl_t; + +statement ok +create subscription ddl_subscription_table from ddl_t with(retention = '1D'); + +statement ok +create subscription ddl_subscription_mv from ddl_mv with(retention = '1D'); + +statement error +create subscription ddl_subscription_table from ddl_t with(retention = '1D'); + +statement error +create subscription ddl_subscription_mv from ddl_mv with(retention = '1D'); + +statement ok +create subscription if not exists ddl_subscription_table from ddl_t with(retention = '1D'); + +statement ok +create subscription if not exists ddl_subscription_mv from ddl_mv with(retention = '1D'); + +statement ok +drop subscription ddl_subscription_table; + +statement ok +drop subscription ddl_subscription_mv; + +statement error +drop subscription ddl_subscription_table; + +statement error +drop subscription ddl_subscription_mv; + +statement ok +drop subscription if exists ddl_subscription_table; + +statement ok +drop subscription if exists ddl_subscription_mv; \ No newline at end of file diff --git a/proto/catalog.proto b/proto/catalog.proto index 600d72f4c24ef..a28bd8e245a2f 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -176,6 +176,9 @@ message Subscription { optional uint64 created_at_epoch = 15; uint32 owner = 16; StreamJobStatus stream_job_status = 17; + + optional string initialized_at_cluster_version = 22; + optional string created_at_cluster_version = 23; } message Connection { diff --git a/src/common/src/acl/mod.rs b/src/common/src/acl/mod.rs index e6e10f8762de2..91f69fea51310 100644 --- a/src/common/src/acl/mod.rs +++ b/src/common/src/acl/mod.rs @@ -106,6 +106,8 @@ pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock = LazyLock::new(AclM pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_VIEW_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_SINK_MODES: LazyLock = LazyLock::new(AclModeSet::empty); +pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock = + LazyLock::new(AclModeSet::empty); pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock = LazyLock::new(|| BitFlags::from(AclMode::Execute).into()); pub static ALL_AVAILABLE_CONNECTION_MODES: LazyLock = diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index ba2048cbca811..0262af0f228fa 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, TableId, UserId, OBJECT_ID_PLACEHOLDER}; +use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{PbStreamJobStatus, PbSubscription}; @@ -63,6 +64,12 @@ pub struct SubscriptionCatalog { /// The user id pub owner: UserId, + + pub initialized_at_epoch: Option, + pub created_at_epoch: Option, + + pub created_at_cluster_version: Option, + pub initialized_at_cluster_version: Option, } #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] @@ -94,6 +101,10 @@ impl SubscriptionCatalog { self } + pub fn create_sql(&self) -> String { + self.definition.clone() + } + pub fn to_proto(&self) -> PbSubscription { assert!(!self.dependent_relations.is_empty()); PbSubscription { @@ -117,10 +128,12 @@ impl SubscriptionCatalog { .iter() .map(|k| k.table_id) .collect_vec(), - initialized_at_epoch: None, - created_at_epoch: None, + initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), + created_at_epoch: self.created_at_epoch.map(|e| e.0), owner: self.owner.into(), stream_job_status: PbStreamJobStatus::Creating.into(), + initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), + created_at_cluster_version: self.created_at_cluster_version.clone(), } } } @@ -153,6 +166,10 @@ impl From<&PbSubscription> for SubscriptionCatalog { .map(|k| TableId::new(*k)) .collect_vec(), owner: prost.owner.into(), + created_at_epoch: prost.created_at_epoch.map(Epoch::from), + initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), + created_at_cluster_version: prost.created_at_cluster_version.clone(), + initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), } } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 9422fd3ec1c57..12a0bc26027e4 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -42,6 +42,7 @@ mod rw_schemas; mod rw_sinks; mod rw_sources; mod rw_streaming_parallelism; +mod rw_subscriptions; mod rw_system_tables; mod rw_table_fragments; mod rw_table_stats; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs index 930c50382c8ef..706b0a06fe353 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs @@ -64,6 +64,10 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result Result, + created_at: Option, + initialized_at_cluster_version: Option, + created_at_cluster_version: Option, +} + +#[system_catalog(table, "rw_catalog.rw_subscriptions")] +fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { + let catalog_reader = reader.catalog_reader.read_guard(); + let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?; + let user_reader = reader.user_info_reader.read_guard(); + let users = user_reader.get_all_users(); + let username_map = user_reader.get_user_name_map(); + + Ok(schemas + .flat_map(|schema| { + schema + .iter_subscription() + .map(|subscription| RwSubscription { + id: subscription.id.subscription_id as i32, + name: subscription.name.clone(), + schema_id: schema.id() as i32, + owner: subscription.owner.user_id as i32, + definition: subscription.definition.clone(), + acl: get_acl_items( + &Object::SubscriptionId(subscription.id.subscription_id), + false, + &users, + username_map, + ), + initialized_at: subscription + .initialized_at_epoch + .map(|e| e.as_timestamptz()), + created_at: subscription.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: subscription + .initialized_at_cluster_version + .clone(), + created_at_cluster_version: subscription.created_at_cluster_version.clone(), + }) + }) + .collect()) +} diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index ffa4b6403324e..1e07abd9b09de 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -512,6 +512,12 @@ pub fn handle_show_create_object( ShowCreateType::Function => { bail_not_implemented!("show create on: {}", show_create_type); } + ShowCreateType::Subscription => { + let subscription = schema + .get_subscription_by_name(&object_name) + .ok_or_else(|| CatalogError::NotFound("subscription", name.to_string()))?; + subscription.create_sql() + } }; let name = format!("{}.{}", schema_name, object_name); diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 9aeb316ce15c8..b9baa6d8868ad 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -116,7 +116,7 @@ impl ObserverState for FrontendObserverNode { tables, indexes, views, - subscriptions: _, + subscriptions, functions, connections, users, @@ -142,6 +142,9 @@ impl ObserverState for FrontendObserverNode { for sink in sinks { catalog_guard.create_sink(&sink) } + for subscription in subscriptions { + catalog_guard.create_subscription(&subscription) + } for table in tables { catalog_guard.create_table(&table) } diff --git a/src/frontend/src/optimizer/plan_node/stream_subscription.rs b/src/frontend/src/optimizer/plan_node/stream_subscription.rs index 22cdd34529250..ebd829ff2658c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_subscription.rs +++ b/src/frontend/src/optimizer/plan_node/stream_subscription.rs @@ -77,7 +77,7 @@ impl StreamSubscription { user_id: UserId, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; - let (input, sink) = Self::derive_subscription_catalog( + let (input, subscription) = Self::derive_subscription_catalog( database_id, schema_id, dependent_relations, @@ -92,7 +92,7 @@ impl StreamSubscription { properties, user_id, )?; - Ok(Self::new(input, sink)) + Ok(Self::new(input, subscription)) } #[allow(clippy::too_many_arguments)] @@ -121,7 +121,7 @@ impl StreamSubscription { }; let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); - let sink_desc = SubscriptionCatalog { + let subscription_desc = SubscriptionCatalog { database_id, schema_id, dependent_relations: dependent_relations.into_iter().collect(), @@ -135,8 +135,12 @@ impl StreamSubscription { distribution_key, properties: properties.into_inner(), owner: user_id, + initialized_at_epoch: None, + created_at_epoch: None, + created_at_cluster_version: None, + initialized_at_cluster_version: None, }; - Ok((input, sink_desc)) + Ok((input, subscription_desc)) } /// The table schema is: | epoch | seq id | row op | sink columns | diff --git a/src/frontend/src/user/user_privilege.rs b/src/frontend/src/user/user_privilege.rs index 37e443cb44f0e..2b474bda0554e 100644 --- a/src/frontend/src/user/user_privilege.rs +++ b/src/frontend/src/user/user_privilege.rs @@ -96,6 +96,7 @@ pub fn available_prost_privilege(object: PbObject, for_dml_table: bool) -> PbGra } PbObject::ViewId(_) => &acl::ALL_AVAILABLE_VIEW_MODES, PbObject::SinkId(_) => &acl::ALL_AVAILABLE_SINK_MODES, + PbObject::SubscriptionId(_) => &acl::ALL_AVAILABLE_SUBSCRIPTION_MODES, PbObject::FunctionId(_) => &acl::ALL_AVAILABLE_FUNCTION_MODES, _ => unreachable!("Invalid object type"), }; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 097c73cf4e1bb..63ec4eca5249c 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -233,6 +233,8 @@ impl From> for PbSubscription { stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. column_catalogs: value.0.columns.0, subscription_from_name: value.0.subscription_from_name, + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, } } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 29d99faec4c70..2f7a68ed081f3 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -94,6 +94,7 @@ impl StreamingJob { } StreamingJob::Subscription(subscription) => { subscription.created_at_epoch = created_at_epoch; + subscription.created_at_cluster_version = created_at_cluster_version; } } } @@ -129,6 +130,7 @@ impl StreamingJob { } StreamingJob::Subscription(subscription) => { subscription.initialized_at_epoch = initialized_at_epoch; + subscription.initialized_at_cluster_version = initialized_at_cluster_version; } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index aff15eb61f71d..b1d37c9d43cc0 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -949,6 +949,7 @@ pub enum ShowCreateType { Source, Sink, Function, + Subscription, } impl fmt::Display for ShowCreateType { @@ -961,6 +962,7 @@ impl fmt::Display for ShowCreateType { ShowCreateType::Source => f.write_str("SOURCE"), ShowCreateType::Sink => f.write_str("SINK"), ShowCreateType::Function => f.write_str("FUNCTION"), + ShowCreateType::Subscription => f.write_str("SUBSCRIPTION"), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 8b3dd657c8821..ce76f869222af 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4530,13 +4530,12 @@ impl Parser { Keyword::INDEX => ShowCreateType::Index, Keyword::SOURCE => ShowCreateType::Source, Keyword::SINK => ShowCreateType::Sink, + Keyword::SUBSCRIPTION => ShowCreateType::Subscription, Keyword::FUNCTION => ShowCreateType::Function, - _ => { - return self.expected( - "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE or SINK", - self.peek_token(), - ) - } + _ => return self.expected( + "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE, SUBSCRIPTION or SINK", + self.peek_token(), + ), }; return Ok(Statement::ShowCreateObject { create_type: show_type, @@ -4544,7 +4543,7 @@ impl Parser { }); } self.expected( - "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE or SINK", + "TABLE, MATERIALIZED VIEW, VIEW, INDEX, FUNCTION, SOURCE, SUBSCRIPTION or SINK", self.peek_token(), ) }