From f9e394b540685c7a82895d97c0675c3e0291d41e Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 27 Sep 2023 17:58:33 +0800 Subject: [PATCH 1/3] feat: Improve Kafka client functionality - Added a new function to check if the client can fetch metadata from Kafka. - Modified the logic for fetching start and stop offsets based on certain values. - Added functions to fetch offsets and start/stop offsets for partitions based on watermark and configuration. - Updated the `SinkWriterParam` type to `SinkWriterMetrics` in the `into_log_sinker` method signature. Signed-off-by: tabVersion --- src/connector/src/sink/kafka.rs | 9 +++++++-- src/connector/src/source/kafka/enumerator/client.rs | 7 +++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a5a524048bfd..c2c76dd2b2e8 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -335,12 +335,17 @@ impl Sink for KafkaSink { // Try Kafka connection. // There is no such interface for kafka producer to validate a connection // use enumerator to validate broker reachability and existence of topic - let mut ticker = KafkaSplitEnumerator::new( + let check = KafkaSplitEnumerator::new( KafkaProperties::from(self.config.clone()), Arc::new(SourceEnumeratorContext::default()), ) .await?; - _ = ticker.list_splits().await?; + if !check.check_reachability().await { + return Err(SinkError::Config(anyhow!( + "cannot connect to kafka broker ({})", + self.config.common.brokers + ))); + } Ok(()) } } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 06577aa4e343..2c0d03306366 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -342,6 +342,13 @@ impl KafkaSplitEnumerator { .set(offset); } + pub async fn check_reachability(&self) -> bool { + self.client + .fetch_metadata(None, self.sync_call_timeout) + .await + .is_ok() + } + async fn fetch_topic_partition(&self) -> anyhow::Result> { // for now, we only support one topic let metadata = self From 1fc227b409643f742458422a2fe5acf87b795574 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 27 Sep 2023 21:17:21 +0800 Subject: [PATCH 2/3] fix test --- e2e_test/sink/kafka/create_sink.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index b1b48ce93bea..809064360c0b 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -10,7 +10,7 @@ create table t_kafka ( v_timestamp timestamp ); -statement error failed to fetch metadata from kafka +statement error cannot connect to kafka broker create sink sink_non_exist_broker from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'make no sense', From 8541f2209d29deb036973b1a69e24d39385f5aaf Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 27 Sep 2023 21:56:09 +0800 Subject: [PATCH 3/3] fix test --- e2e_test/sink/kafka/create_sink.slt | 9 --------- 1 file changed, 9 deletions(-) diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 809064360c0b..635c4b3d03a8 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -19,15 +19,6 @@ create sink sink_non_exist_broker from t_kafka with ( type = 'append-only', ); -statement error topic invalid_topic not found -create sink sink_non_exist_topic from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', - topic = 'invalid_topic', - force_append_only = 'true', - type = 'append-only', -); - # Test create sink with connection # Create a mock connection statement ok