diff --git a/proto/catalog.proto b/proto/catalog.proto index 07aff3baee22f..a8624cebbb7b4 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -39,6 +39,14 @@ enum StreamJobStatus { CREATED = 2; } +// How the stream job was created will determine +// whether they are persisted. +enum CreateType { + CREATE_STATUS_UNSPECIFIED = 0; + BACKGROUND = 1; + FOREGROUND = 2; +} + message StreamSourceInfo { // deprecated plan_common.RowFormatType row_format = 1; @@ -262,6 +270,8 @@ message Table { // Used to filter created / creating tables in meta. StreamJobStatus stream_job_status = 31; + CreateType create_type = 32; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 2b8ef546c9be9..778b43c0598f5 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; -use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -402,6 +402,7 @@ impl TableCatalog { created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Creating.into(), + create_type: PbCreateType::Foreground.into(), } } @@ -607,6 +608,7 @@ mod tests { created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Creating.into(), + create_type: PbCreateType::Foreground.into(), } .into(); diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index c3b65bc26992d..b5a79a6f6b42f 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -448,7 +448,7 @@ mod tests { use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; use risingwave_pb::catalog::table::TableType; - use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; + use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use risingwave_pb::plan_common::PbColumnCatalog; @@ -550,6 +550,7 @@ mod tests { created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Created.into(), + create_type: PbCreateType::Foreground.into(), } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 486b819d3ee7e..abeae5e418af2 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -37,7 +37,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; -use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; @@ -152,6 +152,7 @@ async fn compaction_test( created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Created.into(), + create_type: PbCreateType::Foreground.into(), }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;