Skip to content

Commit

Permalink
simplify deserialize_with with serde_as
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 17, 2023
1 parent c90d67e commit 7cbcb35
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use super::encoder::{JsonEncoder, TimestampHandlingMode};
use super::{
Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::common::PulsarCommon;
use crate::deserialize_duration_from_string;
use crate::sink::utils::{
gen_append_only_message_stream, gen_upsert_message_stream, AppendOnlyAdapterOpts,
UpsertAdapterOpts,
};
use crate::sink::{DummySinkCommitCoordinator, Result};
use crate::{deserialize_duration_from_string, deserialize_u32_from_string};

pub const PULSAR_SINK: &str = "pulsar";

Expand All @@ -56,13 +57,11 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

#[serde_as]
#[derive(Debug, Clone, Deserialize)]
pub struct PulsarConfig {
#[serde(
rename = "properties.retry.max",
default = "_default_max_retries",
deserialize_with = "deserialize_u32_from_string"
)]
#[serde(rename = "properties.retry.max", default = "_default_max_retries")]
#[serde_as(as = "DisplayFromStr")]
pub max_retry_num: u32,

#[serde(
Expand Down

0 comments on commit 7cbcb35

Please sign in to comment.