Skip to content

Commit

Permalink
chore: remove table properties in proto (#14794)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Feb 5, 2024
1 parent c1a1286 commit 9e8db59
Show file tree
Hide file tree
Showing 46 changed files with 123 additions and 246 deletions.
6 changes: 5 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ message Table {
repeated int32 stream_key = 13;
bool append_only = 14;
uint32 owner = 15;
map<string, string> properties = 16;
reserved 16;
reserved "properties"; // deprecated
uint32 fragment_id = 17;
// an optional column index which is the vnode of each row computed by the
// table's consistent hash distribution
Expand Down Expand Up @@ -315,6 +316,9 @@ message Table {
optional string initialized_at_cluster_version = 35;
optional string created_at_cluster_version = 36;

// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
optional uint32 retention_seconds = 37;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
3 changes: 2 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ message KeyRange {
}

message TableOption {
uint32 retention_seconds = 1;
reserved 1;
optional uint32 retention_seconds = 2;
}

message CompactTask {
Expand Down
3 changes: 2 additions & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ message StorageTableDesc {
// TODO: may refactor primary key representations
repeated common.ColumnOrder pk = 3;
repeated uint32 dist_key_in_pk_indices = 4;
uint32 retention_seconds = 5;
reserved 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
// Whether the table is versioned. If `true`, column-aware row encoding will
// be used to be compatible with schema changes.
bool versioned = 8;
repeated uint32 stream_key = 9;
optional uint32 vnode_col_idx_in_pk = 10;
optional uint32 retention_seconds = 11;
}

// Represents a table in external database for CDC scenario
Expand Down
36 changes: 6 additions & 30 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod physical_table;
mod schema;
pub mod test_utils;

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -31,7 +30,6 @@ pub use physical_table::*;
use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior;
use risingwave_pb::plan_common::ColumnDescVersion;
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema};
use thiserror_ext::AsReport;

pub use crate::constants::hummock;
use crate::error::BoxedError;
Expand Down Expand Up @@ -273,46 +271,24 @@ pub struct TableOption {

impl From<&risingwave_pb::hummock::TableOption> for TableOption {
fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
let retention_seconds =
if table_option.retention_seconds == hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND {
None
} else {
Some(table_option.retention_seconds)
};

Self { retention_seconds }
Self {
retention_seconds: table_option.retention_seconds,
}
}
}

impl From<&TableOption> for risingwave_pb::hummock::TableOption {
fn from(table_option: &TableOption) -> Self {
Self {
retention_seconds: table_option
.retention_seconds
.unwrap_or(hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND),
retention_seconds: table_option.retention_seconds,
}
}
}

impl TableOption {
pub fn build_table_option(table_properties: &HashMap<String, String>) -> Self {
pub fn new(retention_seconds: Option<u32>) -> Self {
// now we only support ttl for TableOption
let mut result = TableOption::default();
if let Some(ttl_string) = table_properties.get(hummock::PROPERTIES_RETENTION_SECOND_KEY) {
match ttl_string.trim().parse::<u32>() {
Ok(retention_seconds_u32) => result.retention_seconds = Some(retention_seconds_u32),
Err(e) => {
tracing::info!(
error = %e.as_report(),
"build_table_option parse option ttl_string {}",
ttl_string,
);
result.retention_seconds = None;
}
};
}

result
TableOption { retention_seconds }
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use crate::catalog::TableOption;
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand All @@ -49,7 +47,8 @@ pub struct TableDesc {
/// Whether the table source is append-only
pub append_only: bool,

pub retention_seconds: u32,
// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
pub retention_seconds: Option<u32>,

pub value_indices: Vec<usize>,

Expand Down Expand Up @@ -143,7 +142,6 @@ impl TableDesc {
}

pub fn from_pb_table(table: &Table) -> Self {
let table_options = TableOption::build_table_option(&table.properties);
Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
Expand All @@ -156,9 +154,7 @@ impl TableDesc {
stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
vnode_col_index: table.vnode_col_index.map(|i| i as _),
append_only: table.append_only,
retention_seconds: table_options
.retention_seconds
.unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND),
retention_seconds: table.retention_seconds,
value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
Expand Down
3 changes: 0 additions & 3 deletions src/common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ pub mod hummock {
flag.bits()
}
}

pub const TABLE_OPTION_DUMMY_RETENTION_SECOND: u32 = 0;
pub const PROPERTIES_RETENTION_SECOND_KEY: &str = "retention_seconds";
}

pub mod log_store {
Expand Down
30 changes: 8 additions & 22 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, TableDesc, TableId, TableVersionId,
};
use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
Expand All @@ -32,7 +31,6 @@ use crate::error::{ErrorCode, RwError};
use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;
use crate::WithOptions;

/// Includes full information about a table.
///
Expand Down Expand Up @@ -102,8 +100,8 @@ pub struct TableCatalog {
/// Owner of the table.
pub owner: UserId,

/// Properties of the table. For example, `appendonly` or `retention_seconds`.
pub properties: WithOptions,
// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
pub retention_seconds: Option<u32>,

/// The fragment id of the `Materialize` operator for this table.
pub fragment_id: FragmentId,
Expand Down Expand Up @@ -356,8 +354,7 @@ impl TableCatalog {
pub fn table_desc(&self) -> TableDesc {
use risingwave_common::catalog::TableOption;

let table_options =
TableOption::build_table_option(&self.properties.inner().clone().into_iter().collect());
let table_options = TableOption::new(self.retention_seconds);

TableDesc {
table_id: self.id,
Expand All @@ -366,9 +363,7 @@ impl TableCatalog {
columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
distribution_key: self.distribution_key.clone(),
append_only: self.append_only,
retention_seconds: table_options
.retention_seconds
.unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND),
retention_seconds: table_options.retention_seconds,
value_indices: self.value_indices.clone(),
read_prefix_len_hint: self.read_prefix_len_hint,
watermark_columns: self.watermark_columns.clone(),
Expand Down Expand Up @@ -430,7 +425,6 @@ impl TableCatalog {
.collect_vec(),
append_only: self.append_only,
owner: self.owner,
properties: self.properties.inner().clone().into_iter().collect(),
fragment_id: self.fragment_id,
dml_fragment_id: self.dml_fragment_id,
vnode_col_index: self.vnode_col_index.map(|i| i as _),
Expand All @@ -452,6 +446,7 @@ impl TableCatalog {
incoming_sinks: self.incoming_sinks.clone(),
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
retention_seconds: self.retention_seconds,
}
}

Expand Down Expand Up @@ -545,7 +540,6 @@ impl From<PbTable> for TableCatalog {
stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
append_only: tb.append_only,
owner: tb.owner,
properties: WithOptions::new(tb.properties),
fragment_id: tb.fragment_id,
dml_fragment_id: tb.dml_fragment_id,
vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
Expand All @@ -569,6 +563,7 @@ impl From<PbTable> for TableCatalog {
incoming_sinks: tb.incoming_sinks.clone(),
created_at_cluster_version: tb.created_at_cluster_version.clone(),
initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
retention_seconds: tb.retention_seconds,
}
}
}
Expand All @@ -587,12 +582,10 @@ impl OwnedByUserCatalog for TableCatalog {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::catalog::{
row_id_column_desc, ColumnCatalog, ColumnDesc, ColumnId, TableId,
};
use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY;
use risingwave_common::test_prelude::*;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::OrderType;
Expand All @@ -603,7 +596,6 @@ mod tests {

use super::*;
use crate::catalog::table_catalog::{TableCatalog, TableType};
use crate::WithOptions;

#[test]
fn test_into_table_catalog() {
Expand Down Expand Up @@ -639,10 +631,7 @@ mod tests {
.into(),
append_only: false,
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
properties: HashMap::from([(
String::from(PROPERTIES_RETENTION_SECOND_KEY),
String::from("300"),
)]),
retention_seconds: Some(300),
fragment_id: 0,
dml_fragment_id: None,
initialized_at_epoch: None,
Expand Down Expand Up @@ -705,10 +694,7 @@ mod tests {
distribution_key: vec![],
append_only: false,
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
properties: WithOptions::new(HashMap::from([(
String::from(PROPERTIES_RETENTION_SECOND_KEY),
String::from("300")
)])),
retention_seconds: Some(300),
fragment_id: 0,
dml_fragment_id: None,
vnode_col_index: None,
Expand Down
9 changes: 1 addition & 8 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,8 @@ pub(crate) fn gen_create_index_plan(
let index_table = materialize.table();
let mut index_table_prost = index_table.to_prost(index_schema_id, index_database_id);
{
use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY;
let retention_second_string_key = PROPERTIES_RETENTION_SECOND_KEY.to_string();

// Inherit table properties
table.properties.get(&retention_second_string_key).map(|v| {
index_table_prost
.properties
.insert(retention_second_string_key, v.clone())
});
index_table_prost.retention_seconds = table.retention_seconds;
}

index_table_prost.owner = session.user_id();
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub fn gen_create_mv_plan(
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
) -> Result<(PlanRef, PbTable)> {
if session.config().create_compaction_group_for_mv() {
context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
}

let db_name = session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;

Expand Down Expand Up @@ -119,12 +123,7 @@ pub fn gen_create_mv_plan(
let materialize =
plan_root.gen_materialize_plan(table_name, definition, emit_on_window_close)?;
let mut table = materialize.table().to_prost(schema_id, database_id);
if session.config().create_compaction_group_for_mv() {
table.properties.insert(
String::from("independent_compaction_group"),
String::from("1"),
);
}

let plan: PlanRef = materialize.into();
let dependent_relations =
RelationCollectorVisitor::collect_with(dependent_relations, plan.clone());
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,12 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
/// - column mapping from upstream to table
fn create_table_builder(
&self,
ctx: OptimizerContextRef,
_ctx: OptimizerContextRef,
window_col_idx: Option<usize>,
) -> (TableCatalogBuilder, Vec<usize>, BTreeMap<usize, usize>) {
// NOTE: this function should be called to get a table builder, so that all state tables
// created for Agg node have the same group key columns and pk ordering.
let mut table_builder =
TableCatalogBuilder::new(ctx.with_options().internal_table_subset());
let mut table_builder = TableCatalogBuilder::default();

assert!(table_builder.columns().is_empty());
assert_eq!(table_builder.get_current_pk_len(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ pub fn infer_left_internal_table_catalog(
}
}

let mut internal_table_catalog_builder =
TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset());
let mut internal_table_catalog_builder = TableCatalogBuilder::default();

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
Expand All @@ -180,8 +179,7 @@ pub fn infer_right_internal_table_catalog(input: impl stream::StreamPlanRef) ->
Vec::<usize>::new()
);

let mut internal_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset());
let mut internal_table_catalog_builder = TableCatalogBuilder::default();

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ impl<I: stream::StreamPlanRef> Join<I> {
pk_indices.extend(deduped_input_pk_indices.clone());

// Build internal table
let mut internal_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset());
let mut internal_table_catalog_builder = TableCatalogBuilder::default();
let internal_columns_fields = schema.fields().to_vec();

internal_columns_fields.iter().for_each(|field| {
Expand All @@ -126,8 +125,7 @@ impl<I: stream::StreamPlanRef> Join<I> {
});

// Build degree table.
let mut degree_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset());
let mut degree_table_catalog_builder = TableCatalogBuilder::default();

let degree_column_field = Field::with_name(DataType::Int64, "_degree");

Expand Down
Loading

0 comments on commit 9e8db59

Please sign in to comment.