Skip to content

Commit

Permalink
add ci
Browse files Browse the repository at this point in the history
add ci
  • Loading branch information
xxhZs committed Feb 28, 2024
1 parent 1d3e9c6 commit bf791ad
Show file tree
Hide file tree
Showing 20 changed files with 276 additions and 14 deletions.
26 changes: 26 additions & 0 deletions e2e_test/ddl/alter_owner.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ WHERE
----
sink user1

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION subscription OWNER TO user1;

query TT
SELECT
pg_class.relname AS rel_name,
pg_roles.rolname AS owner
FROM
pg_class
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
JOIN pg_roles ON pg_roles.oid = pg_class.relowner
WHERE
pg_namespace.nspname NOT LIKE 'pg_%'
AND pg_namespace.nspname != 'information_schema'
AND pg_class.relname = 'subscription';
----
sink user1

statement ok
CREATE DATABASE d;

Expand Down Expand Up @@ -181,6 +204,9 @@ DROP DATABASE d;
statement ok
DROP SINK sink;

statement ok
DROP SUBSCRIPTION subscription;

statement ok
DROP SOURCE src;

Expand Down
22 changes: 22 additions & 0 deletions e2e_test/ddl/alter_parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali
statement ok
create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id;

Expand Down Expand Up @@ -94,9 +97,28 @@ select parallelism from sink_parallelism where name = 's';
----
FIXED(4)

statement ok
create subscription subscription1 from t with (retention = '1D');

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
ADAPTIVE

statement ok
alter subscription subscription1 set parallelism = 4;

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
FIXED(4)

statement ok
drop sink s;

statement ok
drop subscription subscription1;

statement ok
drop materialized view m_join;

Expand Down
16 changes: 16 additions & 0 deletions e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH (
connector = 'blackhole'
);

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
CREATE SOURCE src (v INT) WITH (
connector = 'datagen',
Expand Down Expand Up @@ -113,6 +118,14 @@ SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv2 AS mv3 WITH (connector = 'blackhole')

statement ok
ALTER SUBSCRIPTION subscription RENAME TO subscription1;

query TT
SHOW CREATE SUBSCRIPTION subscription1;
----
public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D')

# alter mview rename with alias conflict, used by sink1
statement ok
ALTER MATERIALIZED VIEW mv2 RENAME TO mv3;
Expand Down Expand Up @@ -229,6 +242,9 @@ DROP SCHEMA schema1;
statement ok
DROP SINK sink1;

statement ok
DROP SUBSCRIPTION subscription1;

statement error Permission denied
DROP VIEW v5;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ WHERE nspname = 'test_schema';
----
test_sink test_schema

statement ok
CREATE SUBSCRIPTION test_subscription FROM test_schema.test_table WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION test_subscription SET SCHEMA test_schema;

query TT
SELECT name AS subscriptionname, nspname AS schemaname
FROM rw_subscriptions
JOIN pg_namespace ON pg_namespace.oid = rw_subscriptions.schema_id
WHERE nspname = 'test_schema';
----
test_subscription test_schema

statement ok
CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock');

Expand All @@ -97,6 +113,9 @@ DROP CONNECTION test_schema.test_conn;
statement ok
DROP SINK test_schema.test_sink;

statement ok
DROP SUBSCRIPTION test_subscription;

statement ok
DROP SOURCE test_schema.test_source;

Expand Down
41 changes: 41 additions & 0 deletions e2e_test/ddl/subscription.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
statement ok
create table ddl_t (v1 int);

statement ok
create materialized view ddl_mv as select v1 from ddl_t;

statement ok
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement error
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement error
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
drop subscription ddl_subscription_table;

statement ok
drop subscription ddl_subscription_mv;

statement error
drop subscription ddl_subscription_table;

statement error
drop subscription ddl_subscription_mv;

statement ok
drop subscription if exists ddl_subscription_table;

statement ok
drop subscription if exists ddl_subscription_mv;
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ message Subscription {
optional uint64 created_at_epoch = 15;
uint32 owner = 16;
StreamJobStatus stream_job_status = 17;

optional string initialized_at_cluster_version = 22;
optional string created_at_cluster_version = 23;
}

message Connection {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/acl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock<AclModeSet> = LazyLock::new(AclM
pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_VIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_SINK_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(|| BitFlags::from(AclMode::Execute).into());
pub static ALL_AVAILABLE_CONNECTION_MODES: LazyLock<AclModeSet> =
Expand Down
21 changes: 19 additions & 2 deletions src/frontend/src/catalog/subscription_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, TableId, UserId, OBJECT_ID_PLACEHOLDER};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbStreamJobStatus, PbSubscription};

Expand Down Expand Up @@ -63,6 +64,12 @@ pub struct SubscriptionCatalog {

/// The user id
pub owner: UserId,

pub initialized_at_epoch: Option<Epoch>,
pub created_at_epoch: Option<Epoch>,

pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
}

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
Expand Down Expand Up @@ -94,6 +101,10 @@ impl SubscriptionCatalog {
self
}

pub fn create_sql(&self) -> String {
self.definition.clone()
}

pub fn to_proto(&self) -> PbSubscription {
assert!(!self.dependent_relations.is_empty());
PbSubscription {
Expand All @@ -117,10 +128,12 @@ impl SubscriptionCatalog {
.iter()
.map(|k| k.table_id)
.collect_vec(),
initialized_at_epoch: None,
created_at_epoch: None,
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
owner: self.owner.into(),
stream_job_status: PbStreamJobStatus::Creating.into(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
created_at_cluster_version: self.created_at_cluster_version.clone(),
}
}
}
Expand Down Expand Up @@ -153,6 +166,10 @@ impl From<&PbSubscription> for SubscriptionCatalog {
.map(|k| TableId::new(*k))
.collect_vec(),
owner: prost.owner.into(),
created_at_epoch: prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
created_at_cluster_version: prost.created_at_cluster_version.clone(),
initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod rw_schemas;
mod rw_sinks;
mod rw_sources;
mod rw_streaming_parallelism;
mod rw_subscriptions;
mod rw_system_tables;
mod rw_table_fragments;
mod rw_table_stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schema_catalog.iter_index().for_each(|t| {
table_ids.push(t.index_table.id.table_id);
});

schema_catalog.iter_subscription().for_each(|t| {
table_ids.push(t.id.subscription_id);
});
}
}

Expand Down Expand Up @@ -167,6 +171,25 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
});

schema_catalog.iter_subscription().for_each(|t| {
if let Some(fragments) = table_fragments.get(&t.id.subscription_id) {
rows.push(RwRelationInfo {
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner.user_id as i32,
definition: t.definition.clone(),
relationtype: "SUBSCRIPTION".into(),
relationid: t.id.subscription_id as i32,
relationtimezone: fragments.get_ctx().unwrap().get_timezone().clone(),
fragments: Some(json!(fragments.get_fragments()).to_string()),
initialized_at: t.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: t.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: t.initialized_at_cluster_version.clone(),
created_at_cluster_version: t.created_at_cluster_version.clone(),
});
}
});
}

Ok(rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_frontend_macro::system_catalog;
UNION ALL SELECT id, name, 'source' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_sources
UNION ALL SELECT id, name, 'index' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_indexes
UNION ALL SELECT id, name, 'sink' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_sinks
UNION ALL SELECT id, name, 'subscription' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_subscriptions
UNION ALL SELECT id, name, 'materialized view' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_materialized_views
UNION ALL SELECT id, name, 'view' AS relation_type, schema_id, owner, definition, acl FROM rw_catalog.rw_views
"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, Timestamptz};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::user::grant_privilege::Object;

use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl};
use crate::error::Result;

#[derive(Fields)]
struct RwSubscription {
#[primary_key]
id: i32,
name: String,
schema_id: i32,
owner: i32,
definition: String,
acl: String,
initialized_at: Option<Timestamptz>,
created_at: Option<Timestamptz>,
initialized_at_cluster_version: Option<String>,
created_at_cluster_version: Option<String>,
}

#[system_catalog(table, "rw_catalog.rw_subscriptions")]
fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSubscription>> {
let catalog_reader = reader.catalog_reader.read_guard();
let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
let user_reader = reader.user_info_reader.read_guard();
let users = user_reader.get_all_users();
let username_map = user_reader.get_user_name_map();

Ok(schemas
.flat_map(|schema| {
schema
.iter_subscription()
.map(|subscription| RwSubscription {
id: subscription.id.subscription_id as i32,
name: subscription.name.clone(),
schema_id: schema.id() as i32,
owner: subscription.owner.user_id as i32,
definition: subscription.definition.clone(),
acl: get_acl_items(
&Object::SubscriptionId(subscription.id.subscription_id),
false,
&users,
username_map,
),
initialized_at: subscription
.initialized_at_epoch
.map(|e| e.as_timestamptz()),
created_at: subscription.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: subscription
.initialized_at_cluster_version
.clone(),
created_at_cluster_version: subscription.created_at_cluster_version.clone(),
})
})
.collect())
}
Loading

0 comments on commit bf791ad

Please sign in to comment.