Skip to content

Commit

Permalink
feat: allow retention_seconds on append only table and its downstream…
Browse files Browse the repository at this point in the history
… index (#15268)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
st1page and github-actions[bot] authored Feb 28, 2024
1 parent 4b5e4b3 commit 15c403c
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 25 deletions.
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/basic.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

Expand Down
2 changes: 1 addition & 1 deletion e2e_test/ddl/table/table.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ statement ok
drop table t1;

statement ok
drop table t2;
drop table t2;
38 changes: 38 additions & 0 deletions e2e_test/ttl/ttl.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
statement error
create table t(v int) with (retention_seconds = 5);

statement ok
create table t(v int) APPEND ONLY with (retention_seconds = 5);

statement ok
create index i on t(v);

statement ok
insert into t values(1);

statement ok
flush;

query I
select * from t;
----
1

query I
select * from i;
----
1

statement ok
select pg_sleep(10);

query I
select * from t;
----

query I
select * from i;
----

statement ok
drop table t;
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::rc::Rc;
use std::sync::Arc;

Expand Down Expand Up @@ -323,6 +324,7 @@ fn assemble_materialize(
// LogicalScan(table_desc)

let definition = context.normalized_sql().to_owned();
let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);

let logical_scan = LogicalScan::create(
table_name,
Expand Down Expand Up @@ -395,7 +397,7 @@ fn assemble_materialize(
project_required_cols,
out_names,
)
.gen_index_plan(index_name, definition)
.gen_index_plan(index_name, definition, retention_seconds)
}

pub async fn handle_create_index(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub mod tests {
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql).await.unwrap();

let sql = "create materialized view mv1 with (ttl = 300) as select t1.country from t1";
let sql = "create materialized view mv1 as select t1.country from t1";
frontend.run_sql(sql).await.unwrap();

let session = frontend.session_ref();
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ fn gen_table_plan_inner(
let mut with_properties = WithOptions::new(with_properties);
let connection_id =
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, &session)?;
let retention_seconds = with_properties.retention_seconds();

let is_external_source = source_info.is_some();

Expand Down Expand Up @@ -724,6 +725,14 @@ fn gen_table_plan_inner(
.into());
}

if !append_only && retention_seconds.is_some() {
return Err(ErrorCode::NotSupported(
"Defining retention seconds on table requires the table to be append only.".to_owned(),
"Use the key words `APPEND ONLY`".to_owned(),
)
.into());
}

let materialize = plan_root.gen_table_plan(
context,
name,
Expand All @@ -735,6 +744,7 @@ fn gen_table_plan_inner(
watermark_descs,
version,
is_external_source,
retention_seconds,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down Expand Up @@ -856,6 +866,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
vec![],
Some(col_id_gen.into_version()),
true,
None,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::num::NonZeroU32;
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -524,6 +539,7 @@ impl PlanRoot {
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>,
with_external_source: bool,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let stream_plan = self.gen_optimized_stream_plan(false)?;

Expand Down Expand Up @@ -725,6 +741,7 @@ impl PlanRoot {
pk_column_indices,
row_id_index,
version,
retention_seconds,
)
}

Expand All @@ -748,6 +765,7 @@ impl PlanRoot {
definition,
TableType::MaterializedView,
cardinality,
None,
)
}

Expand All @@ -756,6 +774,7 @@ impl PlanRoot {
&mut self,
index_name: String,
definition: String,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let cardinality = self.compute_cardinality();
let stream_plan = self.gen_optimized_stream_plan(false)?;
Expand All @@ -770,6 +789,7 @@ impl PlanRoot {
definition,
TableType::Index,
cardinality,
retention_seconds,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl StreamCdcTableScan {
&self,
state: &mut BuildFragmentGraphState,
) -> TableCatalog {
let _properties = self.ctx().with_options().internal_table_subset();
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();

Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::assert_matches::assert_matches;
use std::num::NonZeroU32;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl StreamMaterialize {
definition: String,
table_type: TableType,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
// the hidden column name might refer some expr id
Expand All @@ -94,6 +96,7 @@ impl StreamMaterialize {
table_type,
None,
cardinality,
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand All @@ -116,6 +119,7 @@ impl StreamMaterialize {
pk_column_indices: Vec<usize>,
row_id_index: Option<usize>,
version: Option<TableVersion>,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;

Expand All @@ -131,6 +135,7 @@ impl StreamMaterialize {
TableType::Table,
version,
Cardinality::unknown(), // unknown cardinality for tables
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand Down Expand Up @@ -200,12 +205,12 @@ impl StreamMaterialize {
table_type: TableType,
version: Option<TableVersion>,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<TableCatalog> {
let input = rewritten_input;

let value_indices = (0..columns.len()).collect_vec();
let distribution_key = input.distribution().dist_column_indices().to_vec();
let _properties = input.ctx().with_options().internal_table_subset(); // TODO: remove this
let append_only = input.append_only();
let watermark_columns = input.watermark_columns().clone();

Expand Down Expand Up @@ -254,8 +259,7 @@ impl StreamMaterialize {
incoming_sinks: vec![],
initialized_at_cluster_version: None,
created_at_cluster_version: None,
// TODO: https://github.com/risingwavelabs/risingwave/issues/14791
retention_seconds: None,
retention_seconds: retention_seconds.map(|i| i.into()),
})
}

Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl StreamTableScan {
&self,
state: &mut BuildFragmentGraphState,
) -> TableCatalog {
let _properties = self.ctx().with_options().internal_table_subset();
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();

Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use crate::handler::HandlerArgs;
#[derive(Debug, Clone, Default)]
pub struct OverwriteOptions {
pub streaming_rate_limit: Option<u32>,
// ttl has been deprecated
pub ttl: Option<u32>,
}

impl OverwriteOptions {
const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit";
const TTL_KEY: &'static str = "ttl";

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
Expand All @@ -41,15 +38,8 @@ impl OverwriteOptions {
.map(|limit| limit.get() as u32)
}
};
let ttl = args
.with_options
.inner_mut()
.remove(Self::TTL_KEY)
// FIXME(tabVersion): validate the value
.map(|x| x.parse::<u32>().unwrap());
Self {
streaming_rate_limit,
ttl,
}
}
}
7 changes: 0 additions & 7 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ impl WithOptions {
Self { inner }
}

/// Get the subset of the options for internal table catalogs.
///
/// Currently only `retention_seconds` is included.
pub fn internal_table_subset(&self) -> Self {
self.subset([options::RETENTION_SECONDS])
}

pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
if let Some(inner_val) = self.inner.get(key) {
if inner_val.eq_ignore_ascii_case(val) {
Expand Down

0 comments on commit 15c403c

Please sign in to comment.