From 9292b6f38bc070d54bfcf2f2c6f4e63b77f75c1e Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 11 Dec 2024 17:31:30 +0800 Subject: [PATCH] feat(iceberg): make commit checkpoint interval default 60 for iceberg engine table (#19738) --- e2e_test/iceberg/test_case/iceberg_engine.slt | 4 +-- src/frontend/src/handler/create_table.rs | 28 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_engine.slt b/e2e_test/iceberg/test_case/iceberg_engine.slt index 55b23c6399ad..8129dd6265d8 100644 --- a/e2e_test/iceberg/test_case/iceberg_engine.slt +++ b/e2e_test/iceberg/test_case/iceberg_engine.slt @@ -2,7 +2,7 @@ statement ok set sink_decouple = false; statement ok -create table t(id int primary key, name varchar) engine = iceberg; +create table t(id int primary key, name varchar) with(commit_checkpoint_interval = 1) engine = iceberg; statement ok insert into t values(1, 'xxx'); @@ -42,7 +42,7 @@ v_array int[], v_struct struct, v_json jsonb, v_one_layer_struct struct -) engine = iceberg; +) with(commit_checkpoint_interval = 1) engine = iceberg; statement ok INSERT INTO full_type_t VALUES diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 45d818014ebd..fc2b4bc833cd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::config::MetaBackend; use risingwave_common::license::Feature; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -1618,8 +1619,31 @@ pub async fn create_iceberg_engine_table( with.insert("catalog.name".to_owned(), iceberg_catalog_name.clone()); with.insert("database.name".to_owned(), iceberg_database_name.clone()); with.insert("table.name".to_owned(), iceberg_table_name.clone()); - // TODO: change the `commit_checkpoint_interval` to a configurable value - with.insert("commit_checkpoint_interval".to_owned(), "1".to_owned()); + let commit_checkpoint_interval = handler_args + .with_options + .get("commit_checkpoint_interval") + .map(|v| v.to_owned()) + .unwrap_or_else(|| "60".to_owned()); + let commit_checkpoint_interval = commit_checkpoint_interval.parse::().map_err(|_| { + ErrorCode::InvalidInputSyntax(format!( + "commit_checkpoint_interval must be a positive integer: {}", + commit_checkpoint_interval + )) + })?; + + if commit_checkpoint_interval == 0 { + bail!("commit_checkpoint_interval must be a positive integer: 0"); + } + + let sink_decouple = session.config().sink_decouple(); + if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 { + bail!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled") + } + + with.insert( + "commit_checkpoint_interval".to_owned(), + commit_checkpoint_interval.to_string(), + ); with.insert("create_table_if_not_exists".to_owned(), "true".to_owned()); with.insert("enable_config_load".to_owned(), "true".to_owned()); sink_handler_args.with_options = WithOptions::new_with_options(with);