diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index b1b48ce93bead..635c4b3d03a80 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -10,7 +10,7 @@ create table t_kafka ( v_timestamp timestamp ); -statement error failed to fetch metadata from kafka +statement error cannot connect to kafka broker create sink sink_non_exist_broker from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'make no sense', @@ -19,15 +19,6 @@ create sink sink_non_exist_broker from t_kafka with ( type = 'append-only', ); -statement error topic invalid_topic not found -create sink sink_non_exist_topic from t_kafka with ( - connector = 'kafka', - properties.bootstrap.server = '127.0.0.1:29092', - topic = 'invalid_topic', - force_append_only = 'true', - type = 'append-only', -); - # Test create sink with connection # Create a mock connection statement ok diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a5a524048bfd1..c2c76dd2b2e88 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -335,12 +335,17 @@ impl Sink for KafkaSink { // Try Kafka connection. // There is no such interface for kafka producer to validate a connection // use enumerator to validate broker reachability and existence of topic - let mut ticker = KafkaSplitEnumerator::new( + let check = KafkaSplitEnumerator::new( KafkaProperties::from(self.config.clone()), Arc::new(SourceEnumeratorContext::default()), ) .await?; - _ = ticker.list_splits().await?; + if !check.check_reachability().await { + return Err(SinkError::Config(anyhow!( + "cannot connect to kafka broker ({})", + self.config.common.brokers + ))); + } Ok(()) } } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 06577aa4e3433..2c0d03306366b 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -342,6 +342,13 @@ impl KafkaSplitEnumerator { .set(offset); } + pub async fn check_reachability(&self) -> bool { + self.client + .fetch_metadata(None, self.sync_call_timeout) + .await + .is_ok() + } + async fn fetch_topic_partition(&self) -> anyhow::Result> { // for now, we only support one topic let metadata = self