From c86d03404b9f5136c5a3256ff162ceab596a8d5d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 15 Sep 2023 15:45:30 +0800 Subject: [PATCH] address review comments --- proto/catalog.proto | 18 ++++++++++-------- src/connector/src/sink/catalog/mod.rs | 3 ++- src/frontend/src/catalog/index_catalog.rs | 3 ++- src/frontend/src/catalog/table_catalog.rs | 13 +++++-------- src/frontend/src/handler/create_index.rs | 3 ++- src/storage/src/filter_key_extractor.rs | 6 +++--- .../compaction_test/src/delete_range_runner.rs | 5 ++--- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 2f49b307f1b79..07aff3baee22f 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -32,6 +32,13 @@ enum SchemaRegistryNameStrategy { TOPIC_RECORD_NAME_STRATEGY = 2; } +enum StreamJobStatus { + // Prefixed by `STREAM_JOB_STATUS` due to protobuf namespacing rules. + STREAM_JOB_STATUS_UNSPECIFIED = 0; + CREATING = 1; + CREATED = 2; +} + message StreamSourceInfo { // deprecated plan_common.RowFormatType row_format = 1; @@ -116,6 +123,7 @@ message Sink { optional uint64 created_at_epoch = 16; string db_name = 17; string sink_from_name = 18; + StreamJobStatus stream_job_status = 19; } message Connection { @@ -157,6 +165,7 @@ message Index { optional uint64 initialized_at_epoch = 10; optional uint64 created_at_epoch = 11; + StreamJobStatus stream_job_status = 12; } message Function { @@ -191,13 +200,6 @@ message Table { INTERNAL = 4; } - enum TableStatus { - // Prefixed by `TABLE_STATUS` due to protobuf namespacing rules. - TABLE_STATUS_UNSPECIFIED = 0; - CREATING = 1; - CREATED = 2; - } - message TableVersion { // The version number, which will be 0 by default and be increased by 1 for // each schema change in the frontend. @@ -258,7 +260,7 @@ message Table { bool cleaned_by_watermark = 30; // Used to filter created / creating tables in meta. - TableStatus create_status = 31; + StreamJobStatus stream_job_status = 31; // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 16b53af17499a..fd307b05e115e 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::{PbSink, PbSinkType}; +use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus}; #[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)] pub struct SinkId { @@ -191,6 +191,7 @@ impl SinkCatalog { created_at_epoch: self.created_at_epoch.map(|e| e.0), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), + stream_job_status: PbStreamJobStatus::Creating.into(), } } diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index caf0557b2fd09..ca4b4036332d3 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::IndexId; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::PbIndex; +use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus}; use super::ColumnId; use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog}; @@ -184,6 +184,7 @@ impl IndexCatalog { original_columns: self.original_columns.iter().map(Into::into).collect_vec(), initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), created_at_epoch: self.created_at_epoch.map(|e| e.0), + stream_job_status: PbStreamJobStatus::Creating.into(), } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 48a9a59a410c2..2b8ef546c9be9 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -23,10 +23,8 @@ use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::table::{ - OptionalAssociatedSourceId, PbTableStatus, PbTableType, PbTableVersion, -}; -use risingwave_pb::catalog::PbTable; +use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; +use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -403,7 +401,7 @@ impl TableCatalog { initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0), created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, - create_status: PbTableStatus::Creating.into(), + stream_job_status: PbStreamJobStatus::Creating.into(), } } @@ -545,8 +543,7 @@ mod tests { use risingwave_common::test_prelude::*; use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; - use risingwave_pb::catalog::table::PbTableStatus; - use risingwave_pb::catalog::PbTable; + use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; use super::*; @@ -609,7 +606,7 @@ mod tests { cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, - create_status: PbTableStatus::Creating.into(), + stream_job_status: PbStreamJobStatus::Creating.into(), } .into(); diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index ad4512aca354a..a5a002d3b3d79 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -21,7 +21,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{IndexId, TableDesc, TableId}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_pb::catalog::{PbIndex, PbTable}; +use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::{Action, Object}; use risingwave_sqlparser::ast; @@ -242,6 +242,7 @@ pub(crate) fn gen_create_index_plan( original_columns, initialized_at_epoch: None, created_at_epoch: None, + stream_job_status: PbStreamJobStatus::Creating.into(), }; let plan: PlanRef = materialize.into(); diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index 3e4ada60512c6..d861c8256d961 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -447,8 +447,8 @@ mod tests { use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; - use risingwave_pb::catalog::table::{PbTableStatus, TableType}; - use risingwave_pb::catalog::PbTable; + use risingwave_pb::catalog::table::TableType; + use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use risingwave_pb::plan_common::PbColumnCatalog; @@ -549,7 +549,7 @@ mod tests { cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, - create_status: PbTableStatus::Creating.into(), + stream_job_status: PbStreamJobStatus::Created.into(), } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 7d778d7419dd6..258fb62d3b740 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -36,8 +36,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; -use risingwave_pb::catalog::table::PbTableStatus; -use risingwave_pb::catalog::PbTable; +use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; @@ -151,7 +150,7 @@ async fn compaction_test( cardinality: None, created_at_epoch: None, cleaned_by_watermark: false, - create_status: PbTableStatus::Creating.into(), + stream_job_status: PbStreamJobStatus::Created.into(), }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;