Skip to content

Commit

Permalink
remove prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong committed Sep 18, 2023
1 parent d3ed07c commit ec484aa
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
20 changes: 10 additions & 10 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,33 +343,33 @@ pub struct UpsertMessage<'a> {
#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct NatsCommon {
#[serde(rename = "nats.server_url")]
#[serde(rename = "server_url")]
pub server_url: String,
#[serde(rename = "nats.subject")]
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "nats.connect_mode")]
#[serde(rename = "connect_mode")]
pub connect_mode: Option<String>,
#[serde(rename = "username")]
pub user: Option<String>,
#[serde(rename = "password")]
pub password: Option<String>,
#[serde(rename = "nats.jwt")]
#[serde(rename = "jwt")]
pub jwt: Option<String>,
#[serde(rename = "nats.nkey")]
#[serde(rename = "nkey")]
pub nkey: Option<String>,
#[serde(rename = "nats.max_bytes")]
#[serde(rename = "max_bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_bytes: Option<i64>,
#[serde(rename = "nats.max_messages")]
#[serde(rename = "max_messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_messages: Option<i64>,
#[serde(rename = "nats.max_messages_per_subject")]
#[serde(rename = "max_messages_per_subject")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_messages_per_subject: Option<i64>,
#[serde(rename = "nats.max_consumers")]
#[serde(rename = "max_consumers")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_consumers: Option<i32>,
#[serde(rename = "nats.max_message_size")]
#[serde(rename = "max_message_size")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_message_size: Option<i32>,
}
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::StreamExt;
use futures_async_stream::try_stream;

use super::message::NatsMessage;
use super::NatsOffset;
use super::{NatsOffset, NatsSplit};
use crate::parser::ParserConfig;
use crate::source::common::{into_chunk_stream, CommonSplitReader};
use crate::source::nats::NatsProperties;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl SplitReader for NatsSplitReader {
) -> Result<Self> {
// TODO: to simplify the logic, return 1 split for first version
assert!(splits.len() == 1);
let split = splits.into_iter().next().unwrap().into_nats().unwrap();
let split = splits.into_iter().next().unwrap();
let split_id = split.split_id;
let start_position = match &split.start_sequence {
NatsOffset::None => match &properties.scan_startup_mode {
Expand Down

0 comments on commit ec484aa

Please sign in to comment.