Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Aug 27, 2024
1 parent c870b5e commit 55ee7c7
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,19 +763,20 @@ impl Sink for IcebergSink {
const SINK_NAME: &'static str = ICEBERG_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
let commit_checkpoint_interval = desc.properties
.get("commit_checkpoint_interval")
.map(|interval| {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};
});

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
if let Some(commit_checkpoint_interval) = commit_checkpoint_interval
&& commit_checkpoint_interval > 1
{
return Err(SinkError::Config(anyhow!(
"config conflict: Iceberg config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
Expand Down

0 comments on commit 55ee7c7

Please sign in to comment.