diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index c9db87fd3bb94..5a1d73963bd23 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -49,8 +49,8 @@ pub struct PulsarProperties { pub common: PulsarCommon, #[serde(rename = "iceberg.enabled")] - #[serde_as(as = "Option")] - pub iceberg_loader_enabled: bool, + #[serde_as(as = "DisplayFromStr")] + pub iceberg_loader_enabled: Option, #[serde(rename = "iceberg.bucket", default)] pub iceberg_bucket: Option, diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 0568a8935932e..04fee26f42aa7 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -65,7 +65,7 @@ impl SplitReader for PulsarSplitReader { tracing::debug!("creating consumer for pulsar split topic {}", topic,); - if props.iceberg_loader_enabled + if props.iceberg_loader_enabled.unwrap_or(false) && matches!(split.start_offset, PulsarEnumeratorOffset::Earliest) && !topic.starts_with("non-persistent://") {