Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): Add Create subscription in frontend #14831

Merged
merged 23 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,25 @@ message Sink {
optional string created_at_cluster_version = 23;
}

message Subscription {
uint32 id = 1;
string name = 2;
string definition = 3;
repeated common.ColumnOrder plan_pk = 5;
repeated int32 distribution_key = 6;
map<string, string> properties = 7;
repeated plan_common.ColumnCatalog column_catalogs = 8;
string db_name = 9;
string subscription_from_name = 10;
uint32 database_id = 11;
uint32 schema_id = 12;
repeated uint32 dependent_relations = 13;
optional uint64 initialized_at_epoch = 14;
optional uint64 created_at_epoch = 15;
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
uint32 owner = 16;
StreamJobStatus stream_job_status = 17;
}

message Connection {
message PrivateLinkService {
enum PrivateLinkProvider {
Expand Down
25 changes: 25 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ message DropSinkResponse {
uint64 version = 2;
}

message CreateSubscriptionRequest {
catalog.Subscription subscription = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
}

message DropSubscriptionRequest {
uint32 subscription_id = 1;
bool cascade = 2;
}

message DropSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
}

message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
Expand Down Expand Up @@ -179,6 +199,7 @@ message AlterNameRequest {
uint32 source_id = 5;
uint32 schema_id = 6;
uint32 database_id = 7;
uint32 subscription_id = 8;
}
string new_name = 20;
}
Expand All @@ -196,6 +217,7 @@ message AlterOwnerRequest {
uint32 sink_id = 4;
uint32 schema_id = 5;
uint32 database_id = 6;
uint32 subscription_id = 7;
}
uint32 owner_id = 20;
}
Expand All @@ -208,6 +230,7 @@ message AlterSetSchemaRequest {
uint32 sink_id = 4;
uint32 function_id = 5;
uint32 connection_id = 6;
uint32 subscription_id = 7;
}
uint32 new_schema_id = 20;
}
Expand Down Expand Up @@ -399,7 +422,9 @@ service DdlService {
rpc CreateSource(CreateSourceRequest) returns (CreateSourceResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc CreateSink(CreateSinkRequest) returns (CreateSinkResponse);
rpc CreateSubscription(CreateSubscriptionRequest) returns (CreateSubscriptionResponse);
rpc DropSink(DropSinkRequest) returns (DropSinkResponse);
rpc DropSubscription(DropSubscriptionRequest) returns (DropSubscriptionResponse);
rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse);
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ message MetaSnapshot {
repeated catalog.Table tables = 5;
repeated catalog.Index indexes = 6;
repeated catalog.View views = 7;
repeated catalog.Subscription subscriptions = 19;
repeated catalog.Function functions = 15;
repeated catalog.Connection connections = 17;
repeated user.UserInfo users = 8;
Expand All @@ -394,6 +395,7 @@ message Relation {
catalog.Sink sink = 3;
catalog.Index index = 4;
catalog.View view = 5;
catalog.Subscription subscription = 6;
}
}

Expand Down
14 changes: 7 additions & 7 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,6 @@
SinkLogStoreType log_store_type = 3;
}

message SubscriptionNode {
// log store should have a table.
catalog.Table log_store_table = 1;
//retention time with seconds
uint64 retention_seconds = 2;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
// this two field is expressing a list of usize pair, which means when project receives a
Expand Down Expand Up @@ -727,6 +720,12 @@
OverWindowCachePolicy cache_policy = 5;
}

message SubscriptionNode {
catalog.Subscription subscription_catalog = 1;

Check failure on line 724 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "subscription_catalog" on message "SubscriptionNode" changed option "json_name" from "logStoreTable" to "subscriptionCatalog".

Check failure on line 724 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "SubscriptionNode" changed type from "catalog.Table" to "catalog.Subscription".

Check failure on line 724 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "SubscriptionNode" changed name from "log_store_table" to "subscription_catalog".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: the breaking changes here are acceptable because Subscription feature is not released and no one is using it, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yesyes

// log store should have a table.
catalog.Table log_store_table = 2;

Check failure on line 726 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "log_store_table" on message "SubscriptionNode" changed option "json_name" from "retentionSeconds" to "logStoreTable".

Check failure on line 726 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "SubscriptionNode" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 726 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "SubscriptionNode" changed name from "retention_seconds" to "log_store_table".
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -865,6 +864,7 @@
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
}

// The streaming context associated with a stream plan
Expand Down
1 change: 1 addition & 0 deletions proto/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message GrantPrivilege {
uint32 all_tables_schema_id = 11;
uint32 all_sources_schema_id = 12;
uint32 all_dml_tables_schema_id = 13;
uint32 subscription_id = 14;
}
repeated ActionWithGrantOption action_with_opts = 7;
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.table, "Sink")
}

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a state table.
optional!(node.log_store_table, "Subscription")
}

// Now
NodeBody::Now(node) => {
always!(node.state_table, "Now");
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl Binder {
Self::resolve_single_name(name.0, "sink name")
}

/// return the `subscription_name`
pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "subscription name")
}

/// return the `table_name`
pub fn resolve_table_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "table name")
Expand Down
53 changes: 51 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::{
PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable,
PbView,
PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
Expand Down Expand Up @@ -117,6 +117,12 @@ pub trait CatalogWriter: Send + Sync {
affected_table_change: Option<PbReplaceTablePlan>,
) -> Result<()>;

async fn create_subscription(
&self,
subscription: PbSubscription,
graph: StreamFragmentGraph,
) -> Result<()>;

async fn create_function(&self, function: PbFunction) -> Result<()>;

async fn create_connection(
Expand Down Expand Up @@ -150,6 +156,8 @@ pub trait CatalogWriter: Send + Sync {
affected_table_change: Option<PbReplaceTablePlan>,
) -> Result<()>;

async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;

async fn drop_database(&self, database_id: u32) -> Result<()>;

async fn drop_schema(&self, schema_id: u32) -> Result<()>;
Expand All @@ -168,6 +176,12 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>;

async fn alter_subscription_name(
&self,
subscription_id: u32,
subscription_name: &str,
) -> Result<()>;

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>;

async fn alter_schema_name(&self, schema_id: u32, schema_name: &str) -> Result<()>;
Expand Down Expand Up @@ -325,6 +339,18 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn create_subscription(
&self,
subscription: PbSubscription,
graph: StreamFragmentGraph,
) -> Result<()> {
let version = self
.meta_client
.create_subscription(subscription, graph)
.await?;
self.wait_version(version).await
}

async fn create_function(&self, function: PbFunction) -> Result<()> {
let version = self.meta_client.create_function(function).await?;
self.wait_version(version).await
Expand Down Expand Up @@ -400,6 +426,14 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
let version = self
.meta_client
.drop_subscription(subscription_id, cascade)
.await?;
self.wait_version(version).await
}

async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_index(index_id, cascade).await?;
self.wait_version(version).await
Expand Down Expand Up @@ -457,6 +491,21 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_subscription_name(
&self,
subscription_id: u32,
subscription_name: &str,
) -> Result<()> {
let version = self
.meta_client
.alter_name(
alter_name_request::Object::SubscriptionId(subscription_id),
subscription_name,
)
.await?;
self.wait_version(version).await
}

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()> {
let version = self
.meta_client
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) mod index_catalog;
pub(crate) mod root_catalog;
pub(crate) mod schema_catalog;
pub(crate) mod source_catalog;
pub(crate) mod subscription_catalog;
pub(crate) mod system_catalog;
pub(crate) mod table_catalog;
pub(crate) mod view_catalog;
Expand All @@ -47,6 +48,7 @@ use crate::user::UserId;
pub(crate) type ConnectionId = u32;
pub(crate) type SourceId = u32;
pub(crate) type SinkId = u32;
pub(crate) type SubscriptionId = u32;
pub(crate) type ViewId = u32;
pub(crate) type DatabaseId = u32;
pub(crate) type SchemaId = u32;
Expand Down
61 changes: 59 additions & 2 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD};
use risingwave_common::types::DataType;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::catalog::{
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription,
PbTable, PbView,
};
use risingwave_pb::hummock::HummockVersionStats;

use super::function_catalog::FunctionCatalog;
use super::source_catalog::SourceCatalog;
use super::subscription_catalog::SubscriptionCatalog;
use super::view_catalog::ViewCatalog;
use super::{CatalogError, CatalogResult, ConnectionId, SinkId, SourceId, ViewId};
use super::{CatalogError, CatalogResult, ConnectionId, SinkId, SourceId, SubscriptionId, ViewId};
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::database_catalog::DatabaseCatalog;
use crate::catalog::schema_catalog::SchemaCatalog;
Expand Down Expand Up @@ -191,6 +193,14 @@ impl Catalog {
.create_sink(proto);
}

pub fn create_subscription(&mut self, proto: &PbSubscription) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.create_subscription(proto);
}

pub fn create_view(&mut self, proto: &PbView) {
self.get_database_mut(proto.database_id)
.unwrap()
Expand Down Expand Up @@ -385,6 +395,38 @@ impl Catalog {
}
}

pub fn drop_subscription(
&mut self,
db_id: DatabaseId,
schema_id: SchemaId,
subscription_id: SubscriptionId,
) {
self.get_database_mut(db_id)
.unwrap()
.get_schema_mut(schema_id)
.unwrap()
.drop_subscription(subscription_id);
}

pub fn update_subscription(&mut self, proto: &PbSubscription) {
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
if schema.get_subscription_by_id(&proto.id).is_some() {
schema.update_subscription(proto);
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
schema.create_subscription(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id
&& schema.get_subscription_by_id(&proto.id).is_some()
})
.unwrap()
.drop_subscription(proto.id);
}
}

pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
self.get_database_mut(db_id)
.unwrap()
Expand Down Expand Up @@ -682,6 +724,21 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("sink", sink_name.to_string()))
}

pub fn get_subscription_by_name<'a>(
&self,
db_name: &str,
schema_path: SchemaPath<'a>,
subscription_name: &str,
) -> CatalogResult<(&Arc<SubscriptionCatalog>, &'a str)> {
schema_path
.try_find(|schema_name| {
Ok(self
.get_schema_by_name(db_name, schema_name)?
.get_subscription_by_name(subscription_name))
})?
.ok_or_else(|| CatalogError::NotFound("subscription", subscription_name.to_string()))
}

pub fn get_index_by_name<'a>(
&self,
db_name: &str,
Expand Down
Loading
Loading