Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): Add Create subscription in frontend #14831

Merged
merged 23 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions e2e_test/ddl/alter_owner.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ WHERE
----
sink user1

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION subscription OWNER TO user1;

query TT
SELECT
pg_class.relname AS rel_name,
pg_roles.rolname AS owner
FROM
pg_class
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
JOIN pg_roles ON pg_roles.oid = pg_class.relowner
WHERE
pg_namespace.nspname NOT LIKE 'pg_%'
AND pg_namespace.nspname != 'information_schema'
AND pg_class.relname = 'subscription';
----
subscription user1

statement ok
CREATE DATABASE d;

Expand Down Expand Up @@ -181,6 +204,9 @@ DROP DATABASE d;
statement ok
DROP SINK sink;

statement ok
DROP SUBSCRIPTION subscription;

statement ok
DROP SOURCE src;

Expand Down
25 changes: 25 additions & 0 deletions e2e_test/ddl/alter_parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali
statement ok
create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id;

Expand Down Expand Up @@ -94,9 +97,28 @@ select parallelism from sink_parallelism where name = 's';
----
FIXED(4)

statement ok
create subscription subscription1 from t with (retention = '1D');

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
ADAPTIVE

statement ok
alter subscription subscription1 set parallelism = 4;

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
FIXED(4)

statement ok
drop sink s;

statement ok
drop subscription subscription1;

statement ok
drop materialized view m_join;

Expand Down Expand Up @@ -157,5 +179,8 @@ drop view mview_parallelism;
statement ok
drop view sink_parallelism;

statement ok
drop view subscription_parallelism;

statement ok
drop view fragment_parallelism;
16 changes: 16 additions & 0 deletions e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH (
connector = 'blackhole'
);

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
CREATE SOURCE src (v INT) WITH (
connector = 'datagen',
Expand Down Expand Up @@ -113,6 +118,14 @@ SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv2 AS mv3 WITH (connector = 'blackhole')

statement ok
ALTER SUBSCRIPTION subscription RENAME TO subscription1;

query TT
SHOW CREATE SUBSCRIPTION subscription1;
----
public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D')

# alter mview rename with alias conflict, used by sink1
statement ok
ALTER MATERIALIZED VIEW mv2 RENAME TO mv3;
Expand Down Expand Up @@ -229,6 +242,9 @@ DROP SCHEMA schema1;
statement ok
DROP SINK sink1;

statement ok
DROP SUBSCRIPTION subscription1;

statement error Permission denied
DROP VIEW v5;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ WHERE nspname = 'test_schema';
----
test_sink test_schema

statement ok
CREATE SUBSCRIPTION test_subscription FROM test_schema.test_table WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION test_subscription SET SCHEMA test_schema;

query TT
SELECT name AS subscriptionname, nspname AS schemaname
FROM rw_subscriptions
JOIN pg_namespace ON pg_namespace.oid = rw_subscriptions.schema_id
WHERE nspname = 'test_schema';
----
test_subscription test_schema

statement ok
CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock');

Expand All @@ -97,6 +113,9 @@ DROP CONNECTION test_schema.test_conn;
statement ok
DROP SINK test_schema.test_sink;

statement ok
DROP SUBSCRIPTION test_schema.test_subscription;

statement ok
DROP SOURCE test_schema.test_source;

Expand Down
47 changes: 47 additions & 0 deletions e2e_test/ddl/subscription.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
create table ddl_t (v1 int);

statement ok
create materialized view ddl_mv as select v1 from ddl_t;

statement ok
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement error
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement error
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
drop subscription ddl_subscription_table;

statement ok
drop subscription ddl_subscription_mv;

statement error
drop subscription ddl_subscription_table;

statement error
drop subscription ddl_subscription_mv;

statement ok
drop subscription if exists ddl_subscription_table;

statement ok
drop subscription if exists ddl_subscription_mv;

statement ok
drop materialized view ddl_mv;

statement ok
drop table ddl_t;
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ message Subscription {

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
}

message Connection {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/acl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock<AclModeSet> = LazyLock::new(AclM
pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_VIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_SINK_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(|| BitFlags::from(AclMode::Execute).into());
pub static ALL_AVAILABLE_CONNECTION_MODES: LazyLock<AclModeSet> =
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.table, "Sink")
}

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a state table.
optional!(node.log_store_table, "Subscription")
}

// Now
NodeBody::Now(node) => {
always!(node.state_table, "Now");
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/dbeaver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id] }
Expand All @@ -121,6 +123,7 @@
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand All @@ -138,6 +141,7 @@
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name], distribution: Single }
│ │ │ │ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name], distribution: Single }
│ │ │ │ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand All @@ -151,6 +155,7 @@
│ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id], distribution: Single }
│ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand Down
Loading
Loading