From ec484aa907093e5d3f967ae53e9e443558dc861a Mon Sep 17 00:00:00 2001 From: yufansong Date: Sun, 17 Sep 2023 23:41:42 -0700 Subject: [PATCH] remove prefix --- src/connector/src/common.rs | 20 +++++++++---------- .../src/source/nats/source/reader.rs | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 550d3e5b627fb..c2a803b8623f1 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -343,33 +343,33 @@ pub struct UpsertMessage<'a> { #[serde_as] #[derive(Deserialize, Serialize, Debug, Clone)] pub struct NatsCommon { - #[serde(rename = "nats.server_url")] + #[serde(rename = "server_url")] pub server_url: String, - #[serde(rename = "nats.subject")] + #[serde(rename = "subject")] pub subject: String, - #[serde(rename = "nats.connect_mode")] + #[serde(rename = "connect_mode")] pub connect_mode: Option, #[serde(rename = "username")] pub user: Option, #[serde(rename = "password")] pub password: Option, - #[serde(rename = "nats.jwt")] + #[serde(rename = "jwt")] pub jwt: Option, - #[serde(rename = "nats.nkey")] + #[serde(rename = "nkey")] pub nkey: Option, - #[serde(rename = "nats.max_bytes")] + #[serde(rename = "max_bytes")] #[serde_as(as = "Option")] pub max_bytes: Option, - #[serde(rename = "nats.max_messages")] + #[serde(rename = "max_messages")] #[serde_as(as = "Option")] pub max_messages: Option, - #[serde(rename = "nats.max_messages_per_subject")] + #[serde(rename = "max_messages_per_subject")] #[serde_as(as = "Option")] pub max_messages_per_subject: Option, - #[serde(rename = "nats.max_consumers")] + #[serde(rename = "max_consumers")] #[serde_as(as = "Option")] pub max_consumers: Option, - #[serde(rename = "nats.max_message_size")] + #[serde(rename = "max_message_size")] #[serde_as(as = "Option")] pub max_message_size: Option, } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 2a22492f617c0..87b2b95c99ff2 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -19,7 +19,7 @@ use futures::StreamExt; use futures_async_stream::try_stream; use super::message::NatsMessage; -use super::NatsOffset; +use super::{NatsOffset, NatsSplit}; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::nats::NatsProperties; @@ -51,7 +51,7 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); - let split = splits.into_iter().next().unwrap().into_nats().unwrap(); + let split = splits.into_iter().next().unwrap(); let split_id = split.split_id; let start_position = match &split.start_sequence { NatsOffset::None => match &properties.scan_startup_mode {