Skip to content

Commit

Permalink
remove db_name
Browse files Browse the repository at this point in the history
fix ddl controller
  • Loading branch information
xxhZs committed Mar 5, 2024
1 parent b6781a3 commit 6dd8d88
Show file tree
Hide file tree
Showing 13 changed files with 23 additions and 30 deletions.
3 changes: 1 addition & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ message Subscription {
optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string db_name = 17;
string subscription_from_name = 18;
string subscription_from_name = 17;
}

message Connection {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct SchemaCatalog {
sink_by_name: HashMap<String, Arc<SinkCatalog>>,
sink_by_id: HashMap<SinkId, Arc<SinkCatalog>>,
subscription_by_name: HashMap<String, Arc<SubscriptionCatalog>>,
subscription_by_id: HashMap<SinkId, Arc<SubscriptionCatalog>>,
subscription_by_id: HashMap<SubscriptionId, Arc<SubscriptionCatalog>>,
index_by_name: HashMap<String, Arc<IndexCatalog>>,
index_by_id: HashMap<IndexId, Arc<IndexCatalog>>,
indexes_by_table_id: HashMap<TableId, Vec<Arc<IndexCatalog>>>,
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/catalog/subscription_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,13 @@ pub struct SubscriptionCatalog {
/// Primiary keys of the subscription. Derived by the frontend.
pub plan_pk: Vec<ColumnOrder>,

/// Distribution key indices of the sink. For example, if `distribution_key = [1, 2]`, then the
/// Distribution key indices of the subscription. For example, if `distribution_key = [1, 2]`, then the
/// distribution keys will be `columns[1]` and `columns[2]`.
pub distribution_key: Vec<usize>,

/// The properties of the subscription, only `retention`.
pub properties: BTreeMap<String, String>,

/// Name of the database
pub db_name: String,

/// The upstream table name on which the subscription depends
pub subscription_from_name: String,

Expand Down Expand Up @@ -120,7 +117,6 @@ impl SubscriptionCatalog {
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
subscription_from_name: self.subscription_from_name.clone(),
properties: self.properties.clone().into_iter().collect(),
db_name: self.db_name.clone(),
database_id: self.database_id,
schema_id: self.schema_id,
dependent_relations: self
Expand Down Expand Up @@ -157,7 +153,6 @@ impl From<&PbSubscription> for SubscriptionCatalog {
distribution_key: prost.distribution_key.iter().map(|k| *k as _).collect_vec(),
subscription_from_name: prost.subscription_from_name.clone(),
properties: prost.properties.clone().into_iter().collect(),
db_name: prost.db_name.clone(),
database_id: prost.database_id,
schema_id: prost.schema_id,
dependent_relations: prost
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/handler/create_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub fn gen_subscription_plan(
definition,
with_options,
false,
db_name.to_string(),
subscription_from_table_name,
UserId::new(session.user_id()),
)?;
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,6 @@ impl PlanRoot {
definition: String,
properties: WithOptions,
emit_on_window_close: bool,
db_name: String,
subscription_from_table_name: String,
user_id: UserId,
) -> Result<StreamSubscription> {
Expand All @@ -853,7 +852,6 @@ impl PlanRoot {
dependent_relations,
stream_plan,
subscription_name,
db_name,
subscription_from_table_name,
self.required_dist.clone(),
self.required_order.clone(),
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{PlanRef, TableCatalog, WithOptions};

/// [`StreamSink`] represents a subscription at the very end of the graph.
/// [`StreamSubscription`] represents a subscription at the very end of the graph.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamSubscription {
pub base: PlanBase<Stream>,
Expand Down Expand Up @@ -66,7 +66,6 @@ impl StreamSubscription {
dependent_relations: HashSet<TableId>,
input: PlanRef,
name: String,
db_name: String,
subscription_from_name: String,
user_distributed_by: RequiredDist,
user_order_by: Order,
Expand All @@ -84,7 +83,6 @@ impl StreamSubscription {
input,
user_distributed_by,
name,
db_name,
subscription_from_name,
user_order_by,
columns,
Expand All @@ -103,7 +101,6 @@ impl StreamSubscription {
input: PlanRef,
user_distributed_by: RequiredDist,
name: String,
db_name: String,
subscription_from_name: String,
user_order_by: Order,
columns: Vec<ColumnCatalog>,
Expand All @@ -127,7 +124,6 @@ impl StreamSubscription {
dependent_relations: dependent_relations.into_iter().collect(),
id: SubscriptionId::placeholder(),
name,
db_name,
subscription_from_name,
definition,
columns,
Expand All @@ -143,7 +139,7 @@ impl StreamSubscription {
Ok((input, subscription_desc))
}

/// The table schema is: | epoch | seq id | row op | sink columns |
/// The table schema is: | epoch | seq id | row op | subscription columns |
/// Pk is: | epoch | seq id |
fn infer_kv_log_store_table_catalog(&self) -> TableCatalog {
StreamSink::infer_kv_log_store_table_catalog_inner(
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,11 @@ impl CatalogWriter for MockCatalogWriter {
unreachable!()
}

async fn alter_subscription_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> {
async fn alter_subscription_name(
&self,
_subscription_id: u32,
_subscription_name: &str,
) -> Result<()> {
unreachable!()
}

Expand Down Expand Up @@ -762,7 +766,7 @@ impl MockCatalogWriter {
fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<()> {
sink.id = self.gen_id();
self.catalog.write().create_sink(&sink);
self.add_table_or_subscription_id(sink.id, sink.schema_id, sink.database_id);
self.add_table_or_sink_id(sink.id, sink.schema_id, sink.database_id);
Ok(())
}

Expand All @@ -773,7 +777,7 @@ impl MockCatalogWriter {
) -> Result<()> {
subscription.id = self.gen_id();
self.catalog.write().create_subscription(&subscription);
self.add_table_or_sink_id(
self.add_table_or_subscription_id(
subscription.id,
subscription.schema_id,
subscription.database_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl MigrationTrait for Migration {
.not_null(),
)
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.col(
ColumnDef::new(Subscription::SubscriptionFromName)
.string()
.not_null(),
)
.to_owned(),
)
.await?;
Expand All @@ -63,4 +68,5 @@ enum Subscription {
DistributionKey,
Properties,
Definition,
SubscriptionFromName,
}
2 changes: 0 additions & 2 deletions src/meta/model_v2/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub struct Model {
pub distribution_key: I32Array,
pub properties: Property,
pub definition: String,
pub db_name: String,
pub subscription_from_name: String,
}

Expand Down Expand Up @@ -63,7 +62,6 @@ impl From<PbSubscription> for ActiveModel {
distribution_key: Set(pb_subscription.distribution_key.into()),
properties: Set(pb_subscription.properties.into()),
definition: Set(pb_subscription.definition),
db_name: Set(pb_subscription.db_name),
subscription_from_name: Set(pb_subscription.subscription_from_name),
}
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ impl From<ObjectModel<subscription::Model>> for PbSubscription {
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
),
db_name: value.0.db_name,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
column_catalogs: value.0.columns.0,
subscription_from_name: value.0.subscription_from_name,
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/manager/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
subscription_from: table_name,
..
},
}
=> replace_table_name(table_name, to),
} => replace_table_name(table_name, to),
Statement::CreateSink {
stmt: CreateSinkStatement {
sink_from,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ impl DdlController {
StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
StreamingJobId::Subscription(id) => (id as _, ObjectType::Sink),
StreamingJobId::Subscription(id) => (id as _, ObjectType::Subscription),
};

let version = self
Expand Down Expand Up @@ -1950,9 +1950,9 @@ impl DdlController {
.alter_database_name(database_id, new_name)
.await
}
alter_name_request::Object::SubscriptionId(sink_id) => {
alter_name_request::Object::SubscriptionId(subscription_id) => {
mgr.catalog_manager
.alter_subscription_name(sink_id, new_name)
.alter_subscription_name(subscription_id, new_name)
.await
}
},
Expand Down
2 changes: 1 addition & 1 deletion src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,7 @@ impl ParseTo for ObjectType {
ObjectType::Subscription
} else {
return parser.expected(
"TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION SCHEMA, DATABASE, USER or CONNECTION after DROP",
"TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION, SCHEMA, DATABASE, USER or CONNECTION after DROP",
parser.peek_token(),
);
};
Expand Down

0 comments on commit 6dd8d88

Please sign in to comment.