diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 7776258919cd7..462688ec1f512 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -27,6 +27,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; use super::encoder::{JsonEncoder, TimestampHandlingMode}; use super::{ @@ -34,12 +35,12 @@ use super::{ SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::PulsarCommon; +use crate::deserialize_duration_from_string; use crate::sink::utils::{ gen_append_only_message_stream, gen_upsert_message_stream, AppendOnlyAdapterOpts, UpsertAdapterOpts, }; use crate::sink::{DummySinkCommitCoordinator, Result}; -use crate::{deserialize_duration_from_string, deserialize_u32_from_string}; pub const PULSAR_SINK: &str = "pulsar"; @@ -56,13 +57,11 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } +#[serde_as] #[derive(Debug, Clone, Deserialize)] pub struct PulsarConfig { - #[serde( - rename = "properties.retry.max", - default = "_default_max_retries", - deserialize_with = "deserialize_u32_from_string" - )] + #[serde(rename = "properties.retry.max", default = "_default_max_retries")] + #[serde_as(as = "DisplayFromStr")] pub max_retry_num: u32, #[serde(