Skip to content

Commit

Permalink
feat(meta): support create subscription (#15371)
Browse files Browse the repository at this point in the history
Co-authored-by: zwang28 <[email protected]>
  • Loading branch information
xxhZs and zwang28 authored Mar 4, 2024
1 parent 940d6c7 commit 44ccb7e
Show file tree
Hide file tree
Showing 31 changed files with 1,032 additions and 40 deletions.
20 changes: 20 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,26 @@ message Sink {
optional string created_at_cluster_version = 23;
}

message Subscription {
uint32 id = 1;
string name = 2;
string definition = 3;
repeated common.ColumnOrder plan_pk = 4;
repeated int32 distribution_key = 5;
map<string, string> properties = 6;
repeated plan_common.ColumnCatalog column_catalogs = 7;
uint32 database_id = 8;
uint32 schema_id = 9;
repeated uint32 dependent_relations = 10;
optional uint64 initialized_at_epoch = 11;
optional uint64 created_at_epoch = 12;
uint32 owner = 13;
StreamJobStatus stream_job_status = 14;

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;
}

message Connection {
message PrivateLinkService {
enum PrivateLinkProvider {
Expand Down
25 changes: 25 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ message DropSinkResponse {
uint64 version = 2;
}

message CreateSubscriptionRequest {
catalog.Subscription subscription = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
}

message DropSubscriptionRequest {
uint32 subscription_id = 1;
bool cascade = 2;
}

message DropSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
}

message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
Expand Down Expand Up @@ -179,6 +199,7 @@ message AlterNameRequest {
uint32 source_id = 5;
uint32 schema_id = 6;
uint32 database_id = 7;
uint32 subscription_id = 8;
}
string new_name = 20;
}
Expand All @@ -196,6 +217,7 @@ message AlterOwnerRequest {
uint32 sink_id = 4;
uint32 schema_id = 5;
uint32 database_id = 6;
uint32 subscription_id = 7;
}
uint32 owner_id = 20;
}
Expand All @@ -208,6 +230,7 @@ message AlterSetSchemaRequest {
uint32 sink_id = 4;
uint32 function_id = 5;
uint32 connection_id = 6;
uint32 subscription_id = 7;
}
uint32 new_schema_id = 20;
}
Expand Down Expand Up @@ -399,7 +422,9 @@ service DdlService {
rpc CreateSource(CreateSourceRequest) returns (CreateSourceResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc CreateSink(CreateSinkRequest) returns (CreateSinkResponse);
rpc CreateSubscription(CreateSubscriptionRequest) returns (CreateSubscriptionResponse);
rpc DropSink(DropSinkRequest) returns (DropSinkResponse);
rpc DropSubscription(DropSubscriptionRequest) returns (DropSubscriptionResponse);
rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse);
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ message MetaSnapshot {
repeated catalog.View views = 7;
repeated catalog.Function functions = 15;
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
Expand All @@ -394,6 +395,7 @@ message Relation {
catalog.Sink sink = 3;
catalog.Index index = 4;
catalog.View view = 5;
catalog.Subscription subscription = 6;
}
}

Expand Down
14 changes: 7 additions & 7 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,6 @@ message SinkNode {
SinkLogStoreType log_store_type = 3;
}

message SubscriptionNode {
// log store should have a table.
catalog.Table log_store_table = 1;
//retention time with seconds
uint64 retention_seconds = 2;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
// this two field is expressing a list of usize pair, which means when project receives a
Expand Down Expand Up @@ -727,6 +720,12 @@ message OverWindowNode {
OverWindowCachePolicy cache_policy = 5;
}

message SubscriptionNode {
catalog.Subscription subscription_catalog = 1;
// log store should have a table.
catalog.Table log_store_table = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -865,6 +864,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
}

// The streaming context associated with a stream plan
Expand Down
1 change: 1 addition & 0 deletions proto/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message GrantPrivilege {
uint32 all_tables_schema_id = 11;
uint32 all_sources_schema_id = 12;
uint32 all_dml_tables_schema_id = 13;
uint32 subscription_id = 14;
}
repeated ActionWithGrantOption action_with_opts = 7;
}
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ impl ObserverState for FrontendObserverNode {
meta_backup_manifest_id: _,
hummock_write_limits: _,
version,
// todo!: add subscriptions
subscriptions: _,
} = snapshot;

for db in databases {
Expand Down Expand Up @@ -288,6 +290,7 @@ impl FrontendObserverNode {
Operation::Update => catalog_guard.update_view(view),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Subscription(_) => todo!(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use sea_orm_migration::prelude::*;

mod m20230908_072257_init;
mod m20231008_020431_hummock;
mod m20240304_074901_subscription;

pub struct Migrator;

Expand All @@ -13,6 +14,7 @@ impl MigratorTrait for Migrator {
vec![
Box::new(m20230908_072257_init::Migration),
Box::new(m20231008_020431_hummock::Migration),
Box::new(m20240304_074901_subscription::Migration),
]
}
}
Expand Down
66 changes: 66 additions & 0 deletions src/meta/model_v2/migration/src/m20240304_074901_subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use sea_orm_migration::prelude::{Table as MigrationTable, *};

use crate::{assert_not_has_tables, drop_tables};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
assert_not_has_tables!(manager, Subscription);
manager
.create_table(
MigrationTable::create()
.table(Subscription::Table)
.col(
ColumnDef::new(Subscription::SubscriptionId)
.integer()
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(
ColumnDef::new(Subscription::Columns)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::PlanPk)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::DistributionKey)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::Properties)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.to_owned(),
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// drop tables cascade.
drop_tables!(manager, Subscription);
Ok(())
}
}

#[derive(DeriveIden)]
enum Subscription {
Table,
SubscriptionId,
Name,
Columns,
PlanPk,
DistributionKey,
Properties,
Definition,
}
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub mod schema;
pub mod sink;
pub mod source;
pub mod streaming_job;
pub mod subscription;
pub mod system_parameter;
pub mod table;
pub mod user;
Expand All @@ -62,6 +63,7 @@ pub type SchemaId = ObjectId;
pub type TableId = ObjectId;
pub type SourceId = ObjectId;
pub type SinkId = ObjectId;
pub type SubscriptionId = ObjectId;
pub type IndexId = ObjectId;
pub type ViewId = ObjectId;
pub type FunctionId = ObjectId;
Expand Down
11 changes: 11 additions & 0 deletions src/meta/model_v2/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum ObjectType {
Function,
#[sea_orm(string_value = "CONNECTION")]
Connection,
#[sea_orm(string_value = "SUBSCRIPTION")]
Subscription,
}

impl ObjectType {
Expand All @@ -51,6 +53,7 @@ impl ObjectType {
ObjectType::Index => "index",
ObjectType::Function => "function",
ObjectType::Connection => "connection",
ObjectType::Subscription => "subscription",
}
}
}
Expand Down Expand Up @@ -102,6 +105,8 @@ pub enum Relation {
Schema,
#[sea_orm(has_many = "super::sink::Entity")]
Sink,
#[sea_orm(has_many = "super::subscription::Entity")]
Subscription,
#[sea_orm(has_many = "super::source::Entity")]
Source,
#[sea_orm(has_many = "super::table::Entity")]
Expand Down Expand Up @@ -164,6 +169,12 @@ impl Related<super::sink::Entity> for Entity {
}
}

impl Related<super::subscription::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscription.def()
}
}

impl Related<super::source::Entity> for Entity {
fn to() -> RelationDef {
Relation::Source.def()
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use super::schema::Entity as Schema;
pub use super::sink::Entity as Sink;
pub use super::source::Entity as Source;
pub use super::streaming_job::Entity as StreamingJob;
pub use super::subscription::Entity as Subscription;
pub use super::system_parameter::Entity as SystemParameter;
pub use super::table::Entity as Table;
pub use super::user::Entity as User;
Expand Down
66 changes: 66 additions & 0 deletions src/meta/model_v2/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::PbSubscription;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;

use crate::{ColumnCatalogArray, ColumnOrderArray, I32Array, Property, SubscriptionId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "subscription")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub subscription_id: SubscriptionId,
pub name: String,
pub columns: ColumnCatalogArray,
pub plan_pk: ColumnOrderArray,
pub distribution_key: I32Array,
pub properties: Property,
pub definition: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::object::Entity",
from = "Column::SubscriptionId",
to = "super::object::Column::Oid",
on_update = "NoAction",
on_delete = "Cascade"
)]
Object,
}

impl Related<super::object::Entity> for Entity {
fn to() -> RelationDef {
Relation::Object.def()
}
}

impl ActiveModelBehavior for ActiveModel {}

impl From<PbSubscription> for ActiveModel {
fn from(pb_subscription: PbSubscription) -> Self {
Self {
subscription_id: Set(pb_subscription.id as _),
name: Set(pb_subscription.name),
columns: Set(pb_subscription.column_catalogs.into()),
plan_pk: Set(pb_subscription.plan_pk.into()),
distribution_key: Set(pb_subscription.distribution_key.into()),
properties: Set(pb_subscription.properties.into()),
definition: Set(pb_subscription.definition),
}
}
}
Loading

0 comments on commit 44ccb7e

Please sign in to comment.