Skip to content

Commit

Permalink
add ttl for append only table
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Feb 26, 2024
1 parent b145dd0 commit d52fb7c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::rc::Rc;
use std::sync::Arc;

Expand Down Expand Up @@ -323,6 +324,7 @@ fn assemble_materialize(
// LogicalScan(table_desc)

let definition = context.normalized_sql().to_owned();
let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);

let logical_scan = LogicalScan::create(
table_name,
Expand Down Expand Up @@ -395,7 +397,7 @@ fn assemble_materialize(
project_required_cols,
out_names,
)
.gen_index_plan(index_name, definition)
.gen_index_plan(index_name, definition, retention_seconds)
}

pub async fn handle_create_index(
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ fn gen_table_plan_inner(
let mut with_properties = WithOptions::new(with_properties);
let connection_id =
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, &session)?;
let retention_seconds = with_properties.retention_seconds();

let is_external_source = source_info.is_some();

Expand Down Expand Up @@ -724,6 +725,14 @@ fn gen_table_plan_inner(
.into());
}

if !append_only && retention_seconds.is_some() {
return Err(ErrorCode::NotSupported(
"Defining retention seconds on table requires the table to be append only.".to_owned(),
"Use the key words `APPEND ONLY`".to_owned(),
)
.into());
}

let materialize = plan_root.gen_table_plan(
context,
name,
Expand All @@ -735,6 +744,7 @@ fn gen_table_plan_inner(
watermark_descs,
version,
is_external_source,
retention_seconds,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down Expand Up @@ -856,6 +866,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
vec![],
Some(col_id_gen.into_version()),
true,
None,
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::num::NonZeroU32;
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -524,6 +525,7 @@ impl PlanRoot {
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>,
with_external_source: bool,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let stream_plan = self.gen_optimized_stream_plan(false)?;

Expand Down Expand Up @@ -725,6 +727,7 @@ impl PlanRoot {
pk_column_indices,
row_id_index,
version,
retention_seconds,
)
}

Expand All @@ -748,6 +751,7 @@ impl PlanRoot {
definition,
TableType::MaterializedView,
cardinality,
None,
)
}

Expand All @@ -756,6 +760,7 @@ impl PlanRoot {
&mut self,
index_name: String,
definition: String,
retention_seconds: Option<NonZeroU32>,
) -> Result<StreamMaterialize> {
let cardinality = self.compute_cardinality();
let stream_plan = self.gen_optimized_stream_plan(false)?;
Expand All @@ -770,6 +775,7 @@ impl PlanRoot {
definition,
TableType::Index,
cardinality,
retention_seconds,
)
}

Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::assert_matches::assert_matches;
use std::num::NonZeroU32;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl StreamMaterialize {
definition: String,
table_type: TableType,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, table_type)?;
// the hidden column name might refer some expr id
Expand All @@ -94,6 +96,7 @@ impl StreamMaterialize {
table_type,
None,
cardinality,
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand All @@ -116,6 +119,7 @@ impl StreamMaterialize {
pk_column_indices: Vec<usize>,
row_id_index: Option<usize>,
version: Option<TableVersion>,
retention_seconds: Option<NonZeroU32>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;

Expand All @@ -131,6 +135,7 @@ impl StreamMaterialize {
TableType::Table,
version,
Cardinality::unknown(), // unknown cardinality for tables
retention_seconds,
)?;

Ok(Self::new(input, table))
Expand Down Expand Up @@ -200,6 +205,7 @@ impl StreamMaterialize {
table_type: TableType,
version: Option<TableVersion>,
cardinality: Cardinality,
retention_seconds: Option<NonZeroU32>,
) -> Result<TableCatalog> {
let input = rewritten_input;

Expand Down Expand Up @@ -253,8 +259,7 @@ impl StreamMaterialize {
incoming_sinks: vec![],
initialized_at_cluster_version: None,
created_at_cluster_version: None,
// TODO: https://github.com/risingwavelabs/risingwave/issues/14791
retention_seconds: None,
retention_seconds: retention_seconds.map(|i| i.into()),
})
}

Expand Down

0 comments on commit d52fb7c

Please sign in to comment.