From 6dd8d889fb55dd6d8327d5aa20dcd0a60aa494b5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 4 Mar 2024 18:50:13 +0800 Subject: [PATCH] remove db_name fix ddl controller --- proto/catalog.proto | 3 +-- src/frontend/src/catalog/schema_catalog.rs | 2 +- src/frontend/src/catalog/subscription_catalog.rs | 7 +------ src/frontend/src/handler/create_subscription.rs | 1 - src/frontend/src/optimizer/mod.rs | 2 -- .../src/optimizer/plan_node/stream_subscription.rs | 8 ++------ src/frontend/src/test_utils.rs | 10 +++++++--- .../migration/src/m20240304_074901_subscription.rs | 6 ++++++ src/meta/model_v2/src/subscription.rs | 2 -- src/meta/src/controller/mod.rs | 1 - src/meta/src/manager/catalog/utils.rs | 3 +-- src/meta/src/rpc/ddl_controller.rs | 6 +++--- src/sqlparser/src/ast/mod.rs | 2 +- 13 files changed, 23 insertions(+), 30 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 1465e510077ae..7858cd5398db2 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -178,8 +178,7 @@ message Subscription { optional string initialized_at_cluster_version = 15; optional string created_at_cluster_version = 16; - string db_name = 17; - string subscription_from_name = 18; + string subscription_from_name = 17; } message Connection { diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 08701b64cd218..56de1d743da41 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -50,7 +50,7 @@ pub struct SchemaCatalog { sink_by_name: HashMap>, sink_by_id: HashMap>, subscription_by_name: HashMap>, - subscription_by_id: HashMap>, + subscription_by_id: HashMap>, index_by_name: HashMap>, index_by_id: HashMap>, indexes_by_table_id: HashMap>>, diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 0262af0f228fa..3ded154cf055b 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -40,16 +40,13 @@ pub struct SubscriptionCatalog { /// Primiary keys of the subscription. Derived by the frontend. pub plan_pk: Vec, - /// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the + /// Distribution key indices of the subscription. For example, if `distribution_key = [1, 2]`, then the /// distribution keys will be `columns[1]` and `columns[2]`. pub distribution_key: Vec, /// The properties of the subscription, only `retention`. pub properties: BTreeMap, - /// Name of the database - pub db_name: String, - /// The upstream table name on which the subscription depends pub subscription_from_name: String, @@ -120,7 +117,6 @@ impl SubscriptionCatalog { distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(), subscription_from_name: self.subscription_from_name.clone(), properties: self.properties.clone().into_iter().collect(), - db_name: self.db_name.clone(), database_id: self.database_id, schema_id: self.schema_id, dependent_relations: self @@ -157,7 +153,6 @@ impl From<&PbSubscription> for SubscriptionCatalog { distribution_key: prost.distribution_key.iter().map(|k| *k as _).collect_vec(), subscription_from_name: prost.subscription_from_name.clone(), properties: prost.properties.clone().into_iter().collect(), - db_name: prost.db_name.clone(), database_id: prost.database_id, schema_id: prost.schema_id, dependent_relations: prost diff --git a/src/frontend/src/handler/create_subscription.rs b/src/frontend/src/handler/create_subscription.rs index 6bf7a724e4e80..f44abbaa42f02 100644 --- a/src/frontend/src/handler/create_subscription.rs +++ b/src/frontend/src/handler/create_subscription.rs @@ -83,7 +83,6 @@ pub fn gen_subscription_plan( definition, with_options, false, - db_name.to_string(), subscription_from_table_name, UserId::new(session.user_id()), )?; diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index bb97f0b729892..7f3211e4551a6 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -839,7 +839,6 @@ impl PlanRoot { definition: String, properties: WithOptions, emit_on_window_close: bool, - db_name: String, subscription_from_table_name: String, user_id: UserId, ) -> Result { @@ -853,7 +852,6 @@ impl PlanRoot { dependent_relations, stream_plan, subscription_name, - db_name, subscription_from_table_name, self.required_dist.clone(), self.required_order.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_subscription.rs b/src/frontend/src/optimizer/plan_node/stream_subscription.rs index ebd829ff2658c..2c384e108a780 100644 --- a/src/frontend/src/optimizer/plan_node/stream_subscription.rs +++ b/src/frontend/src/optimizer/plan_node/stream_subscription.rs @@ -32,7 +32,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::{PlanRef, TableCatalog, WithOptions}; -/// [`StreamSink`] represents a subscription at the very end of the graph. +/// [`StreamSubscription`] represents a subscription at the very end of the graph. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSubscription { pub base: PlanBase, @@ -66,7 +66,6 @@ impl StreamSubscription { dependent_relations: HashSet, input: PlanRef, name: String, - db_name: String, subscription_from_name: String, user_distributed_by: RequiredDist, user_order_by: Order, @@ -84,7 +83,6 @@ impl StreamSubscription { input, user_distributed_by, name, - db_name, subscription_from_name, user_order_by, columns, @@ -103,7 +101,6 @@ impl StreamSubscription { input: PlanRef, user_distributed_by: RequiredDist, name: String, - db_name: String, subscription_from_name: String, user_order_by: Order, columns: Vec, @@ -127,7 +124,6 @@ impl StreamSubscription { dependent_relations: dependent_relations.into_iter().collect(), id: SubscriptionId::placeholder(), name, - db_name, subscription_from_name, definition, columns, @@ -143,7 +139,7 @@ impl StreamSubscription { Ok((input, subscription_desc)) } - /// The table schema is: | epoch | seq id | row op | sink columns | + /// The table schema is: | epoch | seq id | row op | subscription columns | /// Pk is: | epoch | seq id | fn infer_kv_log_store_table_catalog(&self) -> TableCatalog { StreamSink::infer_kv_log_store_table_catalog_inner( diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 20a33d471d5aa..b0f2944fb115e 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -606,7 +606,11 @@ impl CatalogWriter for MockCatalogWriter { unreachable!() } - async fn alter_subscription_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> { + async fn alter_subscription_name( + &self, + _subscription_id: u32, + _subscription_name: &str, + ) -> Result<()> { unreachable!() } @@ -762,7 +766,7 @@ impl MockCatalogWriter { fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> { sink.id = self.gen_id(); self.catalog.write().create_sink(&sink); - self.add_table_or_subscription_id(sink.id, sink.schema_id, sink.database_id); + self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id); Ok(()) } @@ -773,7 +777,7 @@ impl MockCatalogWriter { ) -> Result<()> { subscription.id = self.gen_id(); self.catalog.write().create_subscription(&subscription); - self.add_table_or_sink_id( + self.add_table_or_subscription_id( subscription.id, subscription.schema_id, subscription.database_id, diff --git a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs index ece549c4f922d..52bdb117727f5 100644 --- a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs +++ b/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs @@ -40,6 +40,11 @@ impl MigrationTrait for Migration { .not_null(), ) .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::SubscriptionFromName) + .string() + .not_null(), + ) .to_owned(), ) .await?; @@ -63,4 +68,5 @@ enum Subscription { DistributionKey, Properties, Definition, + SubscriptionFromName, } diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index 633c3939d5254..096c63078a2a4 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -29,7 +29,6 @@ pub struct Model { pub distribution_key: I32Array, pub properties: Property, pub definition: String, - pub db_name: String, pub subscription_from_name: String, } @@ -63,7 +62,6 @@ impl From for ActiveModel { distribution_key: Set(pb_subscription.distribution_key.into()), properties: Set(pb_subscription.properties.into()), definition: Set(pb_subscription.definition), - db_name: Set(pb_subscription.db_name), subscription_from_name: Set(pb_subscription.subscription_from_name), } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index b8abdc1b66326..42d6b27771edb 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -229,7 +229,6 @@ impl From> for PbSubscription { created_at_epoch: Some( Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, ), - db_name: value.0.db_name, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. column_catalogs: value.0.columns.0, subscription_from_name: value.0.subscription_from_name, diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 342ac8b5e1022..b4f08f46e0091 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -134,8 +134,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str subscription_from: table_name, .. }, - } - => replace_table_name(table_name, to), + } => replace_table_name(table_name, to), Statement::CreateSink { stmt: CreateSinkStatement { sink_from, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7d16d353643fe..216413e840377 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1138,7 +1138,7 @@ impl DdlController { StreamingJobId::Sink(id) => (id as _, ObjectType::Sink), StreamingJobId::Table(_, id) => (id as _, ObjectType::Table), StreamingJobId::Index(idx) => (idx as _, ObjectType::Index), - StreamingJobId::Subscription(id) => (id as _, ObjectType::Sink), + StreamingJobId::Subscription(id) => (id as _, ObjectType::Subscription), }; let version = self @@ -1950,9 +1950,9 @@ impl DdlController { .alter_database_name(database_id, new_name) .await } - alter_name_request::Object::SubscriptionId(sink_id) => { + alter_name_request::Object::SubscriptionId(subscription_id) => { mgr.catalog_manager - .alter_subscription_name(sink_id, new_name) + .alter_subscription_name(subscription_id, new_name) .await } }, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index b1d37c9d43cc0..f49c3f2bfc9f2 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2397,7 +2397,7 @@ impl ParseTo for ObjectType { ObjectType::Subscription } else { return parser.expected( - "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION SCHEMA, DATABASE, USER or CONNECTION after DROP", + "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION, SCHEMA, DATABASE, USER or CONNECTION after DROP", parser.peek_token(), ); };