Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow retention_seconds on append only table and its downstream index #15268

Merged
merged 12 commits into from
Feb 28, 2024
26 changes: 26 additions & 0 deletions e2e_test/ddl/table/table.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,29 @@ drop table t1;

statement ok
drop table t2;

statement error
create table t(v int) with (retention_seconds = 5);

statement ok
create table t(v int) APPEND ONLY with (retention_seconds = 5);

statement ok
insert into t values(1);

statement ok
flush;

query I
select * from t;
----
1

sleep 10s

query I
select * from t;
----

statement ok
drop table t;
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::rc::Rc;
use std::sync::Arc;

Expand Down Expand Up @@ -323,6 +324,7 @@ fn assemble_materialize(
// LogicalScan(table_desc)

let definition = context.normalized_sql().to_owned();
let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);

let logical_scan = LogicalScan::create(
table_name,
Expand Down Expand Up @@ -395,7 +397,7 @@ fn assemble_materialize(
project_required_cols,
out_names,
)
.gen_index_plan(index_name, definition)
.gen_index_plan(index_name, definition, retention_seconds)
}

pub async fn handle_create_index(
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ fn gen_table_plan_inner(
let mut with_properties = WithOptions::new(with_properties);
let connection_id =
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, &session)?;
let retention_seconds = with_properties.retention_seconds();

let is_external_source = source_info.is_some();

Expand Down Expand Up @@ -724,6 +725,14 @@ fn gen_table_plan_inner(
.into());
}

if !append_only && retention_seconds.is_some() {
return Err(ErrorCode::NotSupported(
"Defining retention seconds on table requires the table to be append only.".to_owned(),
"Use the key words `APPEND ONLY`".to_owned(),
)
.into());
}

let materialize = plan_root.gen_table_plan(
context,
name,
Expand All @@ -735,6 +744,7 @@ fn gen_table_plan_inner(
watermark_descs,
version,
is_external_source,
retention_seconds,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down Expand Up @@ -856,6 +866,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
vec![],
Some(col_id_gen.into_version()),
true,
None,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::assert_matches::assert_matches;
st1page marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::HashMap;
use std::num::NonZeroU32;
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -524,6 +525,7 @@ impl PlanRoot {
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>,
with_external_source: bool,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let stream_plan = self.gen_optimized_stream_plan(false)?;

Expand Down Expand Up @@ -725,6 +727,7 @@ impl PlanRoot {
pk_column_indices,
row_id_index,
version,
retention_seconds,
)
}

Expand All @@ -748,6 +751,7 @@ impl PlanRoot {
definition,
TableType::MaterializedView,
cardinality,
None,
)
}

Expand All @@ -756,6 +760,7 @@ impl PlanRoot {
&mut self,
index_name: String,
definition: String,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let cardinality = self.compute_cardinality();
let stream_plan = self.gen_optimized_stream_plan(false)?;
Expand All @@ -770,6 +775,7 @@ impl PlanRoot {
definition,
TableType::Index,
cardinality,
retention_seconds,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl StreamCdcTableScan {
&self,
state: &mut BuildFragmentGraphState,
) -> TableCatalog {
let _properties = self.ctx().with_options().internal_table_subset();
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();

Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::assert_matches::assert_matches;
use std::num::NonZeroU32;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl StreamMaterialize {
definition: String,
table_type: TableType,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
// the hidden column name might refer some expr id
Expand All @@ -94,6 +96,7 @@ impl StreamMaterialize {
table_type,
None,
cardinality,
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand All @@ -116,6 +119,7 @@ impl StreamMaterialize {
pk_column_indices: Vec<usize>,
row_id_index: Option<usize>,
version: Option<TableVersion>,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;

Expand All @@ -131,6 +135,7 @@ impl StreamMaterialize {
TableType::Table,
version,
Cardinality::unknown(), // unknown cardinality for tables
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand Down Expand Up @@ -200,12 +205,12 @@ impl StreamMaterialize {
table_type: TableType,
version: Option<TableVersion>,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<TableCatalog> {
let input = rewritten_input;

let value_indices = (0..columns.len()).collect_vec();
let distribution_key = input.distribution().dist_column_indices().to_vec();
let _properties = input.ctx().with_options().internal_table_subset(); // TODO: remove this
let append_only = input.append_only();
let watermark_columns = input.watermark_columns().clone();

Expand Down Expand Up @@ -254,8 +259,7 @@ impl StreamMaterialize {
incoming_sinks: vec![],
initialized_at_cluster_version: None,
created_at_cluster_version: None,
// TODO: https://github.com/risingwavelabs/risingwave/issues/14791
retention_seconds: None,
retention_seconds: retention_seconds.map(|i| i.into()),
})
}

Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl StreamTableScan {
&self,
state: &mut BuildFragmentGraphState,
) -> TableCatalog {
let _properties = self.ctx().with_options().internal_table_subset();
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();

Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use crate::handler::HandlerArgs;
#[derive(Debug, Clone, Default)]
pub struct OverwriteOptions {
pub streaming_rate_limit: Option<u32>,
// ttl has been deprecated
pub ttl: Option<u32>,
}

impl OverwriteOptions {
const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit";
const TTL_KEY: &'static str = "ttl";

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
Expand All @@ -41,15 +38,8 @@ impl OverwriteOptions {
.map(|limit| limit.get() as u32)
}
};
let ttl = args
.with_options
.inner_mut()
.remove(Self::TTL_KEY)
// FIXME(tabVersion): validate the value
.map(|x| x.parse::<u32>().unwrap());
Self {
streaming_rate_limit,
ttl,
}
}
}
7 changes: 0 additions & 7 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ impl WithOptions {
Self { inner }
}

/// Get the subset of the options for internal table catalogs.
///
/// Currently only `retention_seconds` is included.
pub fn internal_table_subset(&self) -> Self {
self.subset([options::RETENTION_SECONDS])
}

pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
if let Some(inner_val) = self.inner.get(key) {
if inner_val.eq_ignore_ascii_case(val) {
Expand Down
Loading