Skip to content

Commit

Permalink
Use serde-as
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 26, 2023
1 parent f440380 commit ce18288
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod topic;

pub use enumerator::*;
use serde::Deserialize;
use serde_with::serde_as;
pub use split::*;

use self::source::reader::PulsarSplitReader;
Expand All @@ -36,6 +37,7 @@ impl SourceProperties for PulsarProperties {
}

#[derive(Clone, Debug, Deserialize)]
#[serde_as]
pub struct PulsarProperties {
#[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
pub scan_startup_mode: Option<String>,
Expand All @@ -46,7 +48,7 @@ pub struct PulsarProperties {
#[serde(flatten)]
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled", default)]
#[serde(rename = "iceberg.enabled")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub iceberg_loader_enabled: bool,

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SplitReader for PulsarSplitReader {

tracing::debug!("creating consumer for pulsar split topic {}", topic,);

if props.is_iceberg_loader_enabled()
if props.iceberg_loader_enabled
&& matches!(split.start_offset, PulsarEnumeratorOffset::Earliest)
&& !topic.starts_with("non-persistent://")
{
Expand Down

0 comments on commit ce18288

Please sign in to comment.