From 9e8db598511c342ad2c7e5952dbb23d27107aded Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:46:17 +0800 Subject: [PATCH] chore: remove table properties in proto (#14794) --- proto/catalog.proto | 6 +++- proto/hummock.proto | 3 +- proto/plan_common.proto | 3 +- src/common/src/catalog/mod.rs | 36 ++++--------------- src/common/src/catalog/physical_table.rs | 10 ++---- src/common/src/constants.rs | 3 -- src/frontend/src/catalog/table_catalog.rs | 30 +++++----------- src/frontend/src/handler/create_index.rs | 9 +---- src/frontend/src/handler/create_mv.rs | 11 +++--- .../src/optimizer/plan_node/generic/agg.rs | 5 ++- .../plan_node/generic/dynamic_filter.rs | 6 ++-- .../src/optimizer/plan_node/generic/join.rs | 6 ++-- .../src/optimizer/plan_node/generic/source.rs | 6 ++-- .../src/optimizer/plan_node/generic/top_n.rs | 6 ++-- .../plan_node/stream_cdc_table_scan.rs | 4 +-- .../src/optimizer/plan_node/stream_dedup.rs | 3 +- .../plan_node/stream_eowc_over_window.rs | 3 +- .../optimizer/plan_node/stream_materialize.rs | 5 +-- .../src/optimizer/plan_node/stream_now.rs | 3 +- .../optimizer/plan_node/stream_over_window.rs | 3 +- .../src/optimizer/plan_node/stream_sink.rs | 3 +- .../src/optimizer/plan_node/stream_sort.rs | 3 +- .../optimizer/plan_node/stream_table_scan.rs | 4 +-- .../plan_node/stream_watermark_filter.rs | 6 ++-- src/frontend/src/optimizer/plan_node/utils.rs | 13 ++----- .../src/scheduler/distributed/query.rs | 13 ++----- src/frontend/src/utils/with_options.rs | 3 +- .../migration/src/m20230908_072257_init.rs | 4 +-- src/meta/model_v2/src/table.rs | 8 ++--- src/meta/src/controller/catalog.rs | 12 +++---- src/meta/src/controller/mod.rs | 2 +- .../picker/ttl_reclaim_compaction_picker.rs | 3 +- .../manager/compaction_group_manager.rs | 36 +++++++++---------- src/meta/src/manager/catalog/database.rs | 2 +- src/meta/src/manager/streaming_job.rs | 12 ------- src/meta/src/rpc/ddl_controller.rs | 10 +++--- src/meta/src/stream/stream_manager.rs | 20 +++++------ .../hummock_test/src/compactor_tests.rs | 4 +-- src/storage/src/filter_key_extractor.rs | 8 ++--- .../hummock/compactor/compaction_filter.rs | 3 +- .../src/hummock/compactor/compaction_utils.rs | 11 +++--- .../src/hummock/compactor/compactor_runner.rs | 2 +- .../src/table/batch_table/storage_table.rs | 6 +--- src/stream/src/common/table/state_table.rs | 2 +- .../executor/source/state_table_handler.rs | 11 ------ .../src/delete_range_runner.rs | 7 +--- 46 files changed, 123 insertions(+), 246 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 741a85d2d3aa9..99fd1b0a69514 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -266,7 +266,8 @@ message Table { repeated int32 stream_key = 13; bool append_only = 14; uint32 owner = 15; - map properties = 16; + reserved 16; + reserved "properties"; // deprecated uint32 fragment_id = 17; // an optional column index which is the vnode of each row computed by the // table's consistent hash distribution @@ -315,6 +316,9 @@ message Table { optional string initialized_at_cluster_version = 35; optional string created_at_cluster_version = 36; + // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables. + optional uint32 retention_seconds = 37; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/proto/hummock.proto b/proto/hummock.proto index 9b39022f7f734..d40b31c52f60c 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -284,7 +284,8 @@ message KeyRange { } message TableOption { - uint32 retention_seconds = 1; + reserved 1; + optional uint32 retention_seconds = 2; } message CompactTask { diff --git a/proto/plan_common.proto b/proto/plan_common.proto index a914ab9d2da25..82f9fbc63a0f8 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -88,7 +88,7 @@ message StorageTableDesc { // TODO: may refactor primary key representations repeated common.ColumnOrder pk = 3; repeated uint32 dist_key_in_pk_indices = 4; - uint32 retention_seconds = 5; + reserved 5; repeated uint32 value_indices = 6; uint32 read_prefix_len_hint = 7; // Whether the table is versioned. If `true`, column-aware row encoding will @@ -96,6 +96,7 @@ message StorageTableDesc { bool versioned = 8; repeated uint32 stream_key = 9; optional uint32 vnode_col_idx_in_pk = 10; + optional uint32 retention_seconds = 11; } // Represents a table in external database for CDC scenario diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 9bf025061f9b9..c2dcb183d6308 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -19,7 +19,6 @@ mod physical_table; mod schema; pub mod test_utils; -use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; @@ -31,7 +30,6 @@ pub use physical_table::*; use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior; use risingwave_pb::plan_common::ColumnDescVersion; pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema}; -use thiserror_ext::AsReport; pub use crate::constants::hummock; use crate::error::BoxedError; @@ -273,46 +271,24 @@ pub struct TableOption { impl From<&risingwave_pb::hummock::TableOption> for TableOption { fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self { - let retention_seconds = - if table_option.retention_seconds == hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND { - None - } else { - Some(table_option.retention_seconds) - }; - - Self { retention_seconds } + Self { + retention_seconds: table_option.retention_seconds, + } } } impl From<&TableOption> for risingwave_pb::hummock::TableOption { fn from(table_option: &TableOption) -> Self { Self { - retention_seconds: table_option - .retention_seconds - .unwrap_or(hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND), + retention_seconds: table_option.retention_seconds, } } } impl TableOption { - pub fn build_table_option(table_properties: &HashMap) -> Self { + pub fn new(retention_seconds: Option) -> Self { // now we only support ttl for TableOption - let mut result = TableOption::default(); - if let Some(ttl_string) = table_properties.get(hummock::PROPERTIES_RETENTION_SECOND_KEY) { - match ttl_string.trim().parse::() { - Ok(retention_seconds_u32) => result.retention_seconds = Some(retention_seconds_u32), - Err(e) => { - tracing::info!( - error = %e.as_report(), - "build_table_option parse option ttl_string {}", - ttl_string, - ); - result.retention_seconds = None; - } - }; - } - - result + TableOption { retention_seconds } } } diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 92c71d37e2aa1..044e1c1644662 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -22,8 +22,6 @@ use risingwave_pb::common::PbColumnOrder; use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; -use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; -use crate::catalog::TableOption; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -49,7 +47,8 @@ pub struct TableDesc { /// Whether the table source is append-only pub append_only: bool, - pub retention_seconds: u32, + // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables. + pub retention_seconds: Option, pub value_indices: Vec, @@ -143,7 +142,6 @@ impl TableDesc { } pub fn from_pb_table(table: &Table) -> Self { - let table_options = TableOption::build_table_option(&table.properties); Self { table_id: TableId::new(table.id), pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), @@ -156,9 +154,7 @@ impl TableDesc { stream_key: table.stream_key.iter().map(|i| *i as _).collect(), vnode_col_index: table.vnode_col_index.map(|i| i as _), append_only: table.append_only, - retention_seconds: table_options - .retention_seconds - .unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND), + retention_seconds: table.retention_seconds, value_indices: table.value_indices.iter().map(|i| *i as _).collect(), read_prefix_len_hint: table.read_prefix_len_hint as _, watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), diff --git a/src/common/src/constants.rs b/src/common/src/constants.rs index c0ef4efcbf1b1..9bdce7b746af7 100644 --- a/src/common/src/constants.rs +++ b/src/common/src/constants.rs @@ -30,9 +30,6 @@ pub mod hummock { flag.bits() } } - - pub const TABLE_OPTION_DUMMY_RETENTION_SECOND: u32 = 0; - pub const PROPERTIES_RETENTION_SECOND_KEY: &str = "retention_seconds"; } pub mod log_store { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 387c78f300282..fbb77a0ca0bb5 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -19,7 +19,6 @@ use itertools::Itertools; use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, TableDesc, TableId, TableVersionId, }; -use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; @@ -32,7 +31,6 @@ use crate::error::{ErrorCode, RwError}; use crate::expr::ExprImpl; use crate::optimizer::property::Cardinality; use crate::user::UserId; -use crate::WithOptions; /// Includes full information about a table. /// @@ -102,8 +100,8 @@ pub struct TableCatalog { /// Owner of the table. pub owner: UserId, - /// Properties of the table. For example, `appendonly` or `retention_seconds`. - pub properties: WithOptions, + // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables. + pub retention_seconds: Option, /// The fragment id of the `Materialize` operator for this table. pub fragment_id: FragmentId, @@ -356,8 +354,7 @@ impl TableCatalog { pub fn table_desc(&self) -> TableDesc { use risingwave_common::catalog::TableOption; - let table_options = - TableOption::build_table_option(&self.properties.inner().clone().into_iter().collect()); + let table_options = TableOption::new(self.retention_seconds); TableDesc { table_id: self.id, @@ -366,9 +363,7 @@ impl TableCatalog { columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(), distribution_key: self.distribution_key.clone(), append_only: self.append_only, - retention_seconds: table_options - .retention_seconds - .unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND), + retention_seconds: table_options.retention_seconds, value_indices: self.value_indices.clone(), read_prefix_len_hint: self.read_prefix_len_hint, watermark_columns: self.watermark_columns.clone(), @@ -430,7 +425,6 @@ impl TableCatalog { .collect_vec(), append_only: self.append_only, owner: self.owner, - properties: self.properties.inner().clone().into_iter().collect(), fragment_id: self.fragment_id, dml_fragment_id: self.dml_fragment_id, vnode_col_index: self.vnode_col_index.map(|i| i as _), @@ -452,6 +446,7 @@ impl TableCatalog { incoming_sinks: self.incoming_sinks.clone(), created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), + retention_seconds: self.retention_seconds, } } @@ -545,7 +540,6 @@ impl From for TableCatalog { stream_key: tb.stream_key.iter().map(|x| *x as _).collect(), append_only: tb.append_only, owner: tb.owner, - properties: WithOptions::new(tb.properties), fragment_id: tb.fragment_id, dml_fragment_id: tb.dml_fragment_id, vnode_col_index: tb.vnode_col_index.map(|x| x as usize), @@ -569,6 +563,7 @@ impl From for TableCatalog { incoming_sinks: tb.incoming_sinks.clone(), created_at_cluster_version: tb.created_at_cluster_version.clone(), initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(), + retention_seconds: tb.retention_seconds, } } } @@ -587,12 +582,10 @@ impl OwnedByUserCatalog for TableCatalog { #[cfg(test)] mod tests { - use std::collections::HashMap; use risingwave_common::catalog::{ row_id_column_desc, ColumnCatalog, ColumnDesc, ColumnId, TableId, }; - use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::test_prelude::*; use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; @@ -603,7 +596,6 @@ mod tests { use super::*; use crate::catalog::table_catalog::{TableCatalog, TableType}; - use crate::WithOptions; #[test] fn test_into_table_catalog() { @@ -639,10 +631,7 @@ mod tests { .into(), append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, - properties: HashMap::from([( - String::from(PROPERTIES_RETENTION_SECOND_KEY), - String::from("300"), - )]), + retention_seconds: Some(300), fragment_id: 0, dml_fragment_id: None, initialized_at_epoch: None, @@ -705,10 +694,7 @@ mod tests { distribution_key: vec![], append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, - properties: WithOptions::new(HashMap::from([( - String::from(PROPERTIES_RETENTION_SECOND_KEY), - String::from("300") - )])), + retention_seconds: Some(300), fragment_id: 0, dml_fragment_id: None, vnode_col_index: None, diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index df7b49f556f35..af9d9f52b1752 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -210,15 +210,8 @@ pub(crate) fn gen_create_index_plan( let index_table = materialize.table(); let mut index_table_prost = index_table.to_prost(index_schema_id, index_database_id); { - use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; - let retention_second_string_key = PROPERTIES_RETENTION_SECOND_KEY.to_string(); - // Inherit table properties - table.properties.get(&retention_second_string_key).map(|v| { - index_table_prost - .properties - .insert(retention_second_string_key, v.clone()) - }); + index_table_prost.retention_seconds = table.retention_seconds; } index_table_prost.owner = session.user_id(); diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 98362b00f8a7f..a6a48a2c2dba9 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -86,6 +86,10 @@ pub fn gen_create_mv_plan( columns: Vec, emit_mode: Option, ) -> Result<(PlanRef, PbTable)> { + if session.config().create_compaction_group_for_mv() { + context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect."); + } + let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?; @@ -119,12 +123,7 @@ pub fn gen_create_mv_plan( let materialize = plan_root.gen_materialize_plan(table_name, definition, emit_on_window_close)?; let mut table = materialize.table().to_prost(schema_id, database_id); - if session.config().create_compaction_group_for_mv() { - table.properties.insert( - String::from("independent_compaction_group"), - String::from("1"), - ); - } + let plan: PlanRef = materialize.into(); let dependent_relations = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone()); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 33f6889e953b1..faff0c415f925 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -314,13 +314,12 @@ impl Agg { /// - column mapping from upstream to table fn create_table_builder( &self, - ctx: OptimizerContextRef, + _ctx: OptimizerContextRef, window_col_idx: Option, ) -> (TableCatalogBuilder, Vec, BTreeMap) { // NOTE: this function should be called to get a table builder, so that all state tables // created for Agg node have the same group key columns and pk ordering. - let mut table_builder = - TableCatalogBuilder::new(ctx.with_options().internal_table_subset()); + let mut table_builder = TableCatalogBuilder::default(); assert!(table_builder.columns().is_empty()); assert_eq!(table_builder.get_current_pk_len(), 0); diff --git a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs index fd8d128034342..1f6ab5be98b9e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs @@ -157,8 +157,7 @@ pub fn infer_left_internal_table_catalog( } } - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset()); + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); schema.fields().iter().for_each(|field| { internal_table_catalog_builder.add_column(field); @@ -180,8 +179,7 @@ pub fn infer_right_internal_table_catalog(input: impl stream::StreamPlanRef) -> Vec::::new() ); - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); schema.fields().iter().for_each(|field| { internal_table_catalog_builder.add_column(field); diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index b49baf2a75f82..105f8bebb32bd 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -114,8 +114,7 @@ impl Join { pk_indices.extend(deduped_input_pk_indices.clone()); // Build internal table - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); let internal_columns_fields = schema.fields().to_vec(); internal_columns_fields.iter().for_each(|field| { @@ -126,8 +125,7 @@ impl Join { }); // Build degree table. - let mut degree_table_catalog_builder = - TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); + let mut degree_table_catalog_builder = TableCatalogBuilder::default(); let degree_column_field = Field::with_name(DataType::Int64, "_degree"); diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 803b74cd6fa11..04e63a246a7ae 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; + use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; @@ -27,7 +27,7 @@ use super::GenericPlanNode; use crate::catalog::source_catalog::SourceCatalog; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; -use crate::{TableCatalog, WithOptions}; +use crate::TableCatalog; /// In which scnario the source node is created #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -145,7 +145,7 @@ impl Source { // state in source. // Source state doesn't maintain retention_seconds, internal_table_subset function only // returns retention_seconds so default is used here - let mut builder = TableCatalogBuilder::new(WithOptions::new(HashMap::default())); + let mut builder = TableCatalogBuilder::default(); let key = Field { data_type: DataType::Varchar, diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index 9a4993cef271a..3fd19948e6c99 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -41,15 +41,13 @@ impl TopN { pub fn infer_internal_table_catalog( &self, schema: &Schema, - ctx: OptimizerContextRef, + _ctx: OptimizerContextRef, input_stream_key: &[usize], vnode_col_idx: Option, ) -> TableCatalog { let columns_fields = schema.fields().to_vec(); let column_orders = &self.order.column_orders; - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(ctx.with_options().internal_table_subset()); - + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); columns_fields.iter().for_each(|field| { internal_table_catalog_builder.add_column(field); }); diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 064882423de44..8bfecb64b03d5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -69,8 +69,8 @@ impl StreamCdcTableScan { &self, state: &mut BuildFragmentGraphState, ) -> TableCatalog { - let properties = self.ctx().with_options().internal_table_subset(); - let mut catalog_builder = TableCatalogBuilder::new(properties); + let _properties = self.ctx().with_options().internal_table_subset(); + let mut catalog_builder = TableCatalogBuilder::default(); let upstream_schema = &self.core.get_table_columns(); // Use `split_id` as primary key in state table. diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index 3e705acefdd38..b31415c125507 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -50,8 +50,7 @@ impl StreamDedup { pub fn infer_internal_table_catalog(&self) -> TableCatalog { let schema = self.core.schema(); - let mut builder = - TableCatalogBuilder::new(self.base.ctx().with_options().internal_table_subset()); + let mut builder = TableCatalogBuilder::default(); schema.fields().iter().for_each(|field| { builder.add_column(field); diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index aad20307bda6a..7cd9bb533a807 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -82,8 +82,7 @@ impl StreamEowcOverWindow { // The EOWC over window state table has the same schema as the input. let in_fields = self.core.input.schema().fields(); - let mut tbl_builder = - TableCatalogBuilder::new(self.ctx().with_options().internal_table_subset()); + let mut tbl_builder = TableCatalogBuilder::default(); for field in in_fields { tbl_builder.add_column(field); } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 80c675009d134..3abc7ace0e494 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -205,7 +205,7 @@ impl StreamMaterialize { let value_indices = (0..columns.len()).collect_vec(); let distribution_key = input.distribution().dist_column_indices().to_vec(); - let properties = input.ctx().with_options().internal_table_subset(); // TODO: remove this + let _properties = input.ctx().with_options().internal_table_subset(); // TODO: remove this let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); @@ -233,7 +233,6 @@ impl StreamMaterialize { table_type, append_only, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, - properties, fragment_id: OBJECT_ID_PLACEHOLDER, dml_fragment_id: None, vnode_col_index: None, @@ -254,6 +253,8 @@ impl StreamMaterialize { incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, + // TODO: https://github.com/risingwavelabs/risingwave/issues/14791 + retention_seconds: None, }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index b8208635e21e8..d27321c08d06b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -75,8 +75,7 @@ impl StreamNode for StreamNow { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { let schema = self.base.schema(); let dist_keys = self.base.distribution().dist_column_indices().to_vec(); - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(self.base.ctx().with_options().internal_table_subset()); + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); schema.fields().iter().for_each(|field| { internal_table_catalog_builder.add_column(field); }); diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index 15b55a2553f50..7173e3a46e0cd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -51,8 +51,7 @@ impl StreamOverWindow { } fn infer_state_table(&self) -> TableCatalog { - let mut tbl_builder = - TableCatalogBuilder::new(self.ctx().with_options().internal_table_subset()); + let mut tbl_builder = TableCatalogBuilder::default(); let out_schema = self.core.schema(); for field in out_schema.fields() { diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 64154c23ea206..ffa25e3127a81 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -510,8 +510,7 @@ impl StreamSink { /// The table schema is: | epoch | seq id | row op | sink columns | /// Pk is: | epoch | seq id | fn infer_kv_log_store_table_catalog(&self) -> TableCatalog { - let mut table_catalog_builder = - TableCatalogBuilder::new(self.input.ctx().with_options().internal_table_subset()); + let mut table_catalog_builder = TableCatalogBuilder::default(); let mut value_indices = Vec::with_capacity( KV_LOG_STORE_PREDEFINED_COLUMNS.len() + self.sink_desc.columns.len(), diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index fa02489957348..6b45f8fd35a6a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -76,8 +76,7 @@ impl StreamEowcSort { // The sort state table has the same schema as the input. let in_fields = self.input.schema().fields(); - let mut tbl_builder = - TableCatalogBuilder::new(self.ctx().with_options().internal_table_subset()); + let mut tbl_builder = TableCatalogBuilder::default(); for field in in_fields { tbl_builder.add_column(field); } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 8e7ae089284cb..e9a624d7f3ceb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -155,8 +155,8 @@ impl StreamTableScan { &self, state: &mut BuildFragmentGraphState, ) -> TableCatalog { - let properties = self.ctx().with_options().internal_table_subset(); - let mut catalog_builder = TableCatalogBuilder::new(properties); + let _properties = self.ctx().with_options().internal_table_subset(); + let mut catalog_builder = TableCatalogBuilder::default(); let upstream_schema = &self.core.get_table_columns(); // We use vnode as primary key in state table. diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index d090182ba47a9..ed7e367f93715 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{Field, FieldDisplay}; use risingwave_common::types::DataType; @@ -28,7 +26,7 @@ use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{ExprDisplay, ExprImpl}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::{TableCatalog, WithOptions}; +use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamWatermarkFilter { @@ -114,7 +112,7 @@ impl PlanTreeNodeUnary for StreamWatermarkFilter { impl_plan_tree_node_for_unary! {StreamWatermarkFilter} pub fn infer_internal_table_catalog(watermark_type: DataType) -> TableCatalog { - let mut builder = TableCatalogBuilder::new(WithOptions::new(HashMap::default())); + let mut builder = TableCatalogBuilder::default(); let key = Field { data_type: DataType::Int16, diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 4de12b9a82ce6..39d9ff5e7018d 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -27,14 +27,13 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::{CreateType, TableType}; use crate::catalog::{ColumnId, TableCatalog, TableId}; use crate::optimizer::property::{Cardinality, Order, RequiredDist}; -use crate::utils::{Condition, IndexSet, WithOptions}; +use crate::utils::{Condition, IndexSet}; #[derive(Default)] pub struct TableCatalogBuilder { /// All columns in this table columns: Vec, pk: Vec, - properties: WithOptions, value_indices: Option>, vnode_col_idx: Option, column_names: HashMap, @@ -46,14 +45,6 @@ pub struct TableCatalogBuilder { /// For DRY, mainly used for construct internal table catalog in stateful streaming executors. /// Be careful of the order of add column. impl TableCatalogBuilder { - // TODO: Add more fields if internal table is more configurable. - pub fn new(properties: WithOptions) -> Self { - Self { - properties, - ..Default::default() - } - } - /// Add a column from Field info, return the column index of the table pub fn add_column(&mut self, field: &Field) -> usize { let column_idx = self.columns.len(); @@ -159,7 +150,6 @@ impl TableCatalogBuilder { table_type: TableType::Internal, append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, - properties: self.properties, fragment_id: OBJECT_ID_PLACEHOLDER, dml_fragment_id: None, vnode_col_index: self.vnode_col_idx, @@ -184,6 +174,7 @@ impl TableCatalogBuilder { incoming_sinks: vec![], initialized_at_cluster_version: None, created_at_cluster_version: None, + retention_seconds: None, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 4dac2a11ebffd..6295d8036b566 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -463,11 +463,9 @@ pub(crate) mod tests { use std::sync::{Arc, RwLock}; use fixedbitset::FixedBitSet; - use risingwave_common::catalog::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; @@ -494,7 +492,7 @@ pub(crate) mod tests { use crate::session::SessionImpl; use crate::test_utils::MockFrontendMetaClient; use crate::utils::Condition; - use crate::{TableCatalog, WithOptions}; + use crate::TableCatalog; #[tokio::test] async fn test_query_should_not_hang_with_empty_worker() { @@ -565,14 +563,7 @@ pub(crate) mod tests { distribution_key: vec![], append_only: false, owner: DEFAULT_SUPER_USER_ID, - properties: WithOptions::new( - [( - PROPERTIES_RETENTION_SECOND_KEY.into(), - TABLE_OPTION_DUMMY_RETENTION_SECOND.to_string(), - )] - .into_iter() - .collect(), - ), + retention_seconds: None, fragment_id: 0, // FIXME dml_fragment_id: None, // FIXME vnode_col_index: None, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 2ed0876a2dc9f..184596a37a3f8 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -32,9 +32,8 @@ use crate::handler::create_source::UPSTREAM_SOURCE_KEY; use crate::session::SessionImpl; mod options { - use risingwave_common::catalog::hummock::PROPERTIES_RETENTION_SECOND_KEY; - pub const RETENTION_SECONDS: &str = PROPERTIES_RETENTION_SECOND_KEY; + pub const RETENTION_SECONDS: &str = "retention_seconds"; } /// Options or properties extracted from the `WITH` clause of DDLs. diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 63250bb46c7e7..cf4bd9d375598 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -536,7 +536,6 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::DistributionKey).json().not_null()) .col(ColumnDef::new(Table::StreamKey).json().not_null()) .col(ColumnDef::new(Table::AppendOnly).boolean().not_null()) - .col(ColumnDef::new(Table::Properties).json().not_null()) .col(ColumnDef::new(Table::FragmentId).integer()) .col(ColumnDef::new(Table::VnodeColIndex).integer()) .col(ColumnDef::new(Table::RowIdIndex).integer()) @@ -563,6 +562,7 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Table::Description).string()) .col(ColumnDef::new(Table::Version).json()) + .col(ColumnDef::new(Table::RetentionSeconds).integer()) .foreign_key( &mut ForeignKey::create() .name("FK_table_object_id") @@ -1011,7 +1011,6 @@ enum Table { DistributionKey, StreamKey, AppendOnly, - Properties, FragmentId, VnodeColIndex, RowIdIndex, @@ -1026,6 +1025,7 @@ enum Table { CleanedByWatermark, Description, Version, + RetentionSeconds, } #[derive(DeriveIden)] diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index f0d83cb2d847d..963b683f4ef57 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -20,8 +20,8 @@ use sea_orm::ActiveValue::Set; use sea_orm::NotSet; use crate::{ - Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, Property, - SourceId, TableId, TableVersion, + Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId, + TableId, TableVersion, }; #[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)] @@ -108,7 +108,6 @@ pub struct Model { pub distribution_key: I32Array, pub stream_key: I32Array, pub append_only: bool, - pub properties: Property, pub fragment_id: Option, pub vnode_col_index: Option, pub row_id_index: Option, @@ -123,6 +122,7 @@ pub struct Model { pub cleaned_by_watermark: bool, pub description: Option, pub version: Option, + pub retention_seconds: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -217,7 +217,6 @@ impl From for ActiveModel { distribution_key: Set(pb_table.distribution_key.into()), stream_key: Set(pb_table.stream_key.into()), append_only: Set(pb_table.append_only), - properties: Set(pb_table.properties.into()), fragment_id, vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)), row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)), @@ -232,6 +231,7 @@ impl From for ActiveModel { cleaned_by_watermark: Set(pb_table.cleaned_by_watermark), description: Set(pb_table.description), version: Set(pb_table.version.map(|v| v.into())), + retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), } } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index f76f38e84dde7..2ca8e9b98be87 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -27,8 +27,8 @@ use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId, - PrivateLinkService, Property, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, - TableId, UserId, + PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, + UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -2121,16 +2121,16 @@ impl CatalogController { pub async fn get_all_table_options(&self) -> MetaResult> { let inner = self.inner.read().await; - let table_options: Vec<(TableId, Property)> = Table::find() + let table_options: Vec<(TableId, Option)> = Table::find() .select_only() - .columns([table::Column::TableId, table::Column::Properties]) - .into_tuple::<(TableId, Property)>() + .columns([table::Column::TableId, table::Column::RetentionSeconds]) + .into_tuple::<(TableId, Option)>() .all(&inner.db) .await?; Ok(table_options .into_iter() - .map(|(id, property)| (id, TableOption::build_table_option(&property.into_inner()))) + .map(|(id, retention_seconds)| (id, TableOption { retention_seconds })) .collect()) } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 562dd1845376b..4873a42809b0b 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -106,7 +106,6 @@ impl From> for PbTable { stream_key: value.0.stream_key.0, append_only: value.0.append_only, owner: value.1.owner_id as _, - properties: value.0.properties.0, fragment_id: value.0.fragment_id.unwrap_or_default() as u32, vnode_col_index: value.0.vnode_col_index.map(|index| index as _), row_id_index: value.0.row_id_index.map(|index| index as _), @@ -139,6 +138,7 @@ impl From> for PbTable { incoming_sinks: vec![], initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, + retention_seconds: value.0.retention_seconds.map(|id| id as u32), } } } diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index 2e20b129b1c39..307d8f2b0174e 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -15,7 +15,6 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableOption; -use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key_range::KeyRangeCommon; @@ -86,7 +85,7 @@ impl TtlReclaimCompactionPicker { for table_id in table_id_in_sst { match self.table_id_to_ttl.get(&table_id) { Some(ttl_second_u32) => { - assert!(*ttl_second_u32 != TABLE_OPTION_DUMMY_RETENTION_SECOND); + assert!(*ttl_second_u32 > 0); // default to zero. let ttl_mill = *ttl_second_u32 as u64 * 1000; let min_epoch = expire_epoch.subtract_ms(ttl_mill); diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 8db45a2286478..a2cfb72526da6 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -56,6 +56,7 @@ use crate::model::{ BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction, }; use crate::storage::MetaStore; +use crate::stream::CreateStreamingJobOption; impl HummockManager { pub(super) async fn build_compaction_group_manager( @@ -106,12 +107,8 @@ impl HummockManager { &self, mv_table: Option, mut internal_tables: Vec, - table_properties: &HashMap, + create_stream_job_option: CreateStreamingJobOption, ) -> Result> { - let is_independent_compaction_group = table_properties - .get("independent_compaction_group") - .map(|s| s == "1") - == Some(true); let mut pairs = vec![]; if let Some(mv_table) = mv_table { if internal_tables.extract_if(|t| *t == mv_table).count() > 0 { @@ -120,7 +117,7 @@ impl HummockManager { // materialized_view pairs.push(( mv_table, - if is_independent_compaction_group { + if create_stream_job_option.new_independent_compaction_group { CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) } else { CompactionGroupId::from(StaticCompactionGroupId::MaterializedView) @@ -131,7 +128,7 @@ impl HummockManager { for table_id in internal_tables { pairs.push(( table_id, - if is_independent_compaction_group { + if create_stream_job_option.new_independent_compaction_group { CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) } else { CompactionGroupId::from(StaticCompactionGroupId::StateDefault) @@ -994,17 +991,17 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; + use std::collections::BTreeMap; use itertools::Itertools; use risingwave_common::catalog::TableId; - use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::meta::table_fragments::Fragment; use crate::hummock::test_utils::setup_compute_env; use crate::hummock::HummockManager; use crate::model::TableFragments; + use crate::stream::CreateStreamingJobOption; #[tokio::test] async fn test_inner() { @@ -1101,16 +1098,14 @@ mod tests { let group_number = || async { compaction_group_manager.list_compaction_group().await.len() }; assert_eq!(registered_number().await, 0); - let mut table_properties = HashMap::from([( - String::from(PROPERTIES_RETENTION_SECOND_KEY), - String::from("300"), - )]); compaction_group_manager .register_table_fragments( Some(table_fragment_1.table_id().table_id), table_fragment_1.internal_table_ids(), - &table_properties, + CreateStreamingJobOption { + new_independent_compaction_group: false, + }, ) .await .unwrap(); @@ -1119,7 +1114,9 @@ mod tests { .register_table_fragments( Some(table_fragment_2.table_id().table_id), table_fragment_2.internal_table_ids(), - &table_properties, + CreateStreamingJobOption { + new_independent_compaction_group: false, + }, ) .await .unwrap(); @@ -1141,15 +1138,14 @@ mod tests { // Test `StaticCompactionGroupId::NewCompactionGroup` in `register_table_fragments` assert_eq!(group_number().await, 2); - table_properties.insert( - String::from("independent_compaction_group"), - String::from("1"), - ); + compaction_group_manager .register_table_fragments( Some(table_fragment_1.table_id().table_id), table_fragment_1.internal_table_ids(), - &table_properties, + CreateStreamingJobOption { + new_independent_compaction_group: true, + }, ) .await .unwrap(); diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index d060a610b6fd5..cd2f0bbb13b94 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -301,7 +301,7 @@ impl DatabaseManager { pub fn get_all_table_options(&self) -> HashMap { self.tables .iter() - .map(|(id, table)| (*id, TableOption::build_table_option(&table.properties))) + .map(|(id, table)| (*id, TableOption::new(table.retention_seconds))) .collect() } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 54dfdb9a22abb..0fcdb2008a103 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use risingwave_common::catalog::TableVersionId; use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; @@ -245,16 +243,6 @@ impl StreamingJob { } } - pub fn properties(&self) -> HashMap { - match self { - Self::MaterializedView(table) => table.properties.clone(), - Self::Sink(sink, _) => sink.properties.clone(), - Self::Table(_, table, ..) => table.properties.clone(), - Self::Index(_, index_table) => index_table.properties.clone(), - Self::Source(source) => source.with_properties.clone(), - } - } - /// Returns the [`TableVersionId`] if this job is `Table`. pub fn table_version_id(&self) -> Option { if let Self::Table(_, table, ..) = self { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 52459bce9ddc2..84ff4ea1de682 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -74,8 +74,8 @@ use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, - CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, - StreamFragmentGraph, + CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef, + ReplaceTableContext, SourceManagerRef, StreamFragmentGraph, }; use crate::{MetaError, MetaResult}; @@ -1414,12 +1414,15 @@ impl DdlController { internal_tables, building_locations, existing_locations, - table_properties: stream_job.properties(), definition: stream_job.definition(), mv_table_id: stream_job.mv_table(), create_type: stream_job.create_type(), ddl_type: stream_job.into(), replace_table_job_info, + // TODO: https://github.com/risingwavelabs/risingwave/issues/14793 + option: CreateStreamingJobOption { + new_independent_compaction_group: false, + }, }; // 4. Mark tables as creating, including internal tables and the table of the stream job. @@ -1823,7 +1826,6 @@ impl DdlController { dispatchers, building_locations, existing_locations, - table_properties: stream_job.properties(), }; Ok((ctx, table_fragments)) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b098800168fd3..5889292756ca7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -37,6 +37,11 @@ use crate::{MetaError, MetaResult}; pub type GlobalStreamManagerRef = Arc; +#[derive(Default)] +pub struct CreateStreamingJobOption { + pub new_independent_compaction_group: bool, +} + /// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job. /// /// Note: for better readability, keep this struct complete and immutable once created. @@ -57,10 +62,6 @@ pub struct CreateStreamingJobContext { /// The locations of the existing actors, essentially the upstream mview actors to update. pub existing_locations: Locations, - /// The properties of the streaming job. - // TODO: directly store `StreamingJob` here. - pub table_properties: HashMap, - /// DDL definition. pub definition: String, @@ -72,6 +73,8 @@ pub struct CreateStreamingJobContext { /// Context provided for potential replace table, typically used when sinking into a table. pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, + + pub option: CreateStreamingJobOption, } impl CreateStreamingJobContext { @@ -170,10 +173,6 @@ pub struct ReplaceTableContext { /// The locations of the existing actors, essentially the downstream chain actors to update. pub existing_locations: Locations, - - /// The properties of the streaming job. - // TODO: directly store `StreamingJob here. - pub table_properties: HashMap, } /// `GlobalStreamManager` manages all the streams in the system. @@ -406,7 +405,6 @@ impl GlobalStreamManager { CreateStreamingJobContext { dispatchers, upstream_mview_actors, - table_properties, building_locations, existing_locations, definition, @@ -415,6 +413,7 @@ impl GlobalStreamManager { create_type, ddl_type, replace_table_job_info, + option, }: CreateStreamingJobContext, ) -> MetaResult<()> { let mut replace_table_command = None; @@ -426,7 +425,7 @@ impl GlobalStreamManager { .register_table_fragments( mv_table_id, internal_tables.keys().copied().collect(), - &table_properties, + option, ) .await?; debug_assert_eq!( @@ -518,7 +517,6 @@ impl GlobalStreamManager { dispatchers, building_locations, existing_locations, - table_properties: _, }: ReplaceTableContext, ) -> MetaResult<()> { self.build_actors(&table_fragments, &building_locations, &existing_locations) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 9e542234a30d3..a634bcafc6ff1 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -290,7 +290,7 @@ pub(crate) mod tests { compact_task.table_options = BTreeMap::from([( 0, TableOption { - retention_seconds: 64, + retention_seconds: Some(64), }, )]); compact_task.current_epoch_time = 0; @@ -961,7 +961,7 @@ pub(crate) mod tests { compact_task.table_options = BTreeMap::from_iter([( existing_table_id, TableOption { - retention_seconds: retention_seconds_expire_second, + retention_seconds: Some(retention_seconds_expire_second), }, )]); compact_task.current_epoch_time = epoch; diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index 019139eedeedc..b9c03b36d6126 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -433,14 +433,13 @@ pub type FilterKeyExtractorManagerRef = Arc; #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; + use std::collections::HashSet; use std::mem; use std::sync::Arc; use bytes::{BufMut, BytesMut}; use itertools::Itertools; use risingwave_common::catalog::ColumnDesc; - use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; @@ -531,10 +530,7 @@ mod tests { optional_associated_source_id: None, append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, - properties: HashMap::from([( - String::from(PROPERTIES_RETENTION_SECOND_KEY), - String::from("300"), - )]), + retention_seconds: Some(300), fragment_id: 0, dml_fragment_id: None, initialized_at_epoch: None, diff --git a/src/storage/src/hummock/compactor/compaction_filter.rs b/src/storage/src/hummock/compactor/compaction_filter.rs index bea939374fac2..c804f2428e157 100644 --- a/src/storage/src/hummock/compactor/compaction_filter.rs +++ b/src/storage/src/hummock/compactor/compaction_filter.rs @@ -15,7 +15,6 @@ use std::collections::{HashMap, HashSet}; use dyn_clone::DynClone; -use risingwave_common::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; use risingwave_hummock_sdk::key::FullKey; pub trait CompactionFilter: Send + Sync + DynClone { @@ -80,7 +79,7 @@ impl CompactionFilter for TtlCompactionFilter { } match self.table_id_to_ttl.get(&table_id) { Some(ttl_second_u32) => { - assert!(*ttl_second_u32 != TABLE_OPTION_DUMMY_RETENTION_SECOND); + assert!(*ttl_second_u32 > 0); // default to zero. let ttl_mill = *ttl_second_u32 as u64 * 1000; let min_epoch = Epoch(self.expire_epoch).subtract_ms(ttl_mill); diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 15a3c5c97c4f9..758a6d1253451 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -134,7 +134,6 @@ pub struct TaskConfig { } pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter { - use risingwave_common::catalog::TableOption; let mut multi_filter = MultiCompactionFilter::default(); let compaction_filter_flag = CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default(); @@ -150,11 +149,11 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact let id_to_ttl = compact_task .table_options .iter() - .filter(|id_to_option| { - let table_option: TableOption = id_to_option.1.into(); - table_option.retention_seconds.is_some() + .filter_map(|(id, option)| { + option + .retention_seconds + .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None }) }) - .map(|id_to_option| (*id_to_option.0, id_to_option.1.retention_seconds)) .collect(); let ttl_filter = Box::new(TtlCompactionFilter::new( @@ -321,7 +320,7 @@ pub async fn check_compaction_result( let has_ttl = compact_task .table_options .iter() - .any(|(_, table_option)| table_option.retention_seconds > 0); + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); let mut compact_table_ids = compact_task .input_ssts diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 158881f5f06b3..ade78acc7ff3b 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -357,7 +357,7 @@ pub async fn compact( let has_ttl = compact_task .table_options .iter() - .any(|(_, table_option)| table_option.retention_seconds > 0); + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); let mut task_status = TaskStatus::Success; // skip sst related to non-existent able_id to reduce io let sstable_infos = compact_task diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 0c55bf79ffa4b..4caa43725172f 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -146,11 +146,7 @@ impl StorageTableInner { .collect_vec(); let table_option = TableOption { - retention_seconds: if table_desc.retention_seconds > 0 { - Some(table_desc.retention_seconds) - } else { - None - }, + retention_seconds: table_desc.retention_seconds, }; let value_indices = table_desc .get_value_indices() diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 08312c0c882dc..81ab622d3e61e 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -357,7 +357,7 @@ where OpConsistencyLevel::Inconsistent }; - let table_option = TableOption::build_table_option(table_catalog.get_properties()); + let table_option = TableOption::new(table_catalog.retention_seconds); let new_local_options = if IS_REPLICATED { NewLocalOptions::new_replicated(table_id, op_consistency_level, table_option) } else { diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index e3f7b80e6880e..7bfb2bd3ec487 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -29,7 +29,6 @@ use std::sync::Arc; use futures::{pin_mut, StreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl}; @@ -53,11 +52,6 @@ pub struct SourceStateTableHandler { impl SourceStateTableHandler { pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { - // The state of source should not be cleaned up by retention_seconds - assert!(!table_catalog - .properties - .contains_key(&String::from(PROPERTIES_RETENTION_SECOND_KEY))); - Self { state_store: StateTable::from_table_catalog(table_catalog, store, None).await, } @@ -68,11 +62,6 @@ impl SourceStateTableHandler { store: S, vnodes: Option>, ) -> Self { - // The state of source should not be cleaned up by retention_seconds - assert!(!table_catalog - .properties - .contains_key(&String::from(PROPERTIES_RETENTION_SECOND_KEY))); - Self { state_store: StateTable::from_table_catalog(table_catalog, store, vnodes).await, } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 21cc49c80cd6e..2f501ff98591e 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::future::Future; use std::ops::{Bound, RangeBounds}; use std::pin::{pin, Pin}; @@ -25,7 +24,6 @@ use futures::StreamExt; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::catalog::TableId; use risingwave_common::config::{ extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, @@ -135,10 +133,7 @@ async fn compaction_test( distribution_key: vec![], stream_key: vec![], owner: 0, - properties: HashMap::::from([( - PROPERTIES_RETENTION_SECOND_KEY.to_string(), - 0.to_string(), - )]), + retention_seconds: None, fragment_id: 0, dml_fragment_id: None, initialized_at_epoch: None,