From 6016339fc4ca505fad484c1a92cee4d9f2daef1c Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 27 Nov 2023 13:21:34 +0800 Subject: [PATCH 1/2] fix --- src/connector/src/source/kafka/mod.rs | 6 +++++- src/connector/src/source/nats/mod.rs | 5 ++++- src/connector/src/source/pulsar/mod.rs | 6 +++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index aa6c3dfb8b147..e8a4c4aa55df0 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -106,7 +106,11 @@ pub struct KafkaProperties { #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")] pub scan_startup_mode: Option, - #[serde(rename = "scan.startup.timestamp_millis", alias = "kafka.time.offset")] + #[serde( + rename = "scan.startup.timestamp.millis", + alias = "kafka.time.offset", + alias = "scan.startup.timestamp_millis" // keep for compatibility + )] pub time_offset: Option, #[serde(rename = "properties.group.id", alias = "kafka.consumer.group")] diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 835c22add90ba..cbc935c17c4ee 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -34,7 +34,10 @@ pub struct NatsProperties { #[serde(rename = "scan.startup.mode")] pub scan_startup_mode: Option, - #[serde(rename = "scan.startup.timestamp_millis")] + #[serde( + rename = "scan.startup.timestamp.millis", + alias = "scan.startup.timestamp_millis" + )] pub start_time: Option, #[serde(rename = "stream")] diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index cda1bcb528341..4464e91b9e541 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -43,7 +43,11 @@ pub struct PulsarProperties { #[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")] pub scan_startup_mode: Option, - #[serde(rename = "scan.startup.timestamp_millis", alias = "pulsar.time.offset")] + #[serde( + rename = "scan.startup.timestamp.millis", + alias = "pulsar.time.offset", + alias = "scan.startup.timestamp_millis" + )] pub time_offset: Option, #[serde(flatten)] From e926c66b2a340d7de1abdc0c2ab3305f0f5d7626 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Thu, 7 Dec 2023 16:04:38 +0800 Subject: [PATCH 2/2] update --- src/connector/with_options_source.yaml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 9426c0288b803..0cb181cfb93ab 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -16,10 +16,10 @@ KafkaProperties: field_type: String required: false alias: kafka.scan.startup.mode - - name: scan.startup.timestamp_millis + - name: scan.startup.timestamp.millis field_type: String required: false - alias: kafka.time.offset + alias: scan.startup.timestamp_millis - name: properties.group.id field_type: String required: false @@ -219,9 +219,10 @@ NatsProperties: - name: scan.startup.mode field_type: String required: false - - name: scan.startup.timestamp_millis + - name: scan.startup.timestamp.millis field_type: String required: false + alias: scan.startup.timestamp_millis - name: stream field_type: String required: true @@ -428,10 +429,10 @@ PulsarProperties: field_type: String required: false alias: pulsar.scan.startup.mode - - name: scan.startup.timestamp_millis + - name: scan.startup.timestamp.millis field_type: String required: false - alias: pulsar.time.offset + alias: scan.startup.timestamp_millis - name: topic field_type: String required: true