Skip to content

Commit

Permalink
feat(iceberg): make commit checkpoint interval default 60 for iceberg…
Browse files Browse the repository at this point in the history
… engine table (#19738)
  • Loading branch information
chenzl25 authored Dec 11, 2024
1 parent ffe63f1 commit 9292b6f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
set sink_decouple = false;

statement ok
create table t(id int primary key, name varchar) engine = iceberg;
create table t(id int primary key, name varchar) with(commit_checkpoint_interval = 1) engine = iceberg;

statement ok
insert into t values(1, 'xxx');
Expand Down Expand Up @@ -42,7 +42,7 @@ v_array int[],
v_struct struct<a int,b int>,
v_json jsonb,
v_one_layer_struct struct<id bigint, v_small_int smallint, v_int int, v_long bigint, v_float real, v_double double, v_varchar varchar, v_bool boolean, v_date date, v_timestamp timestamptz, v_ts_ntz timestamp, v_decimal decimal, v_json jsonb>
) engine = iceberg;
) with(commit_checkpoint_interval = 1) engine = iceberg;

statement ok
INSERT INTO full_type_t VALUES
Expand Down
28 changes: 26 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::config::MetaBackend;
use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
Expand Down Expand Up @@ -1618,8 +1619,31 @@ pub async fn create_iceberg_engine_table(
with.insert("catalog.name".to_owned(), iceberg_catalog_name.clone());
with.insert("database.name".to_owned(), iceberg_database_name.clone());
with.insert("table.name".to_owned(), iceberg_table_name.clone());
// TODO: change the `commit_checkpoint_interval` to a configurable value
with.insert("commit_checkpoint_interval".to_owned(), "1".to_owned());
let commit_checkpoint_interval = handler_args
.with_options
.get("commit_checkpoint_interval")
.map(|v| v.to_owned())
.unwrap_or_else(|| "60".to_owned());
let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
ErrorCode::InvalidInputSyntax(format!(
"commit_checkpoint_interval must be a positive integer: {}",
commit_checkpoint_interval
))
})?;

if commit_checkpoint_interval == 0 {
bail!("commit_checkpoint_interval must be a positive integer: 0");
}

let sink_decouple = session.config().sink_decouple();
if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
bail!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled")
}

with.insert(
"commit_checkpoint_interval".to_owned(),
commit_checkpoint_interval.to_string(),
);
with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
with.insert("enable_config_load".to_owned(), "true".to_owned());
sink_handler_args.with_options = WithOptions::new_with_options(with);
Expand Down

0 comments on commit 9292b6f

Please sign in to comment.