Skip to content

Commit

Permalink
fix: check kafka reachability without involving spec topic (risingwav…
Browse files Browse the repository at this point in the history
…elabs#12569)

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Sep 27, 2023
1 parent 454e72d commit 9386536
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
11 changes: 1 addition & 10 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ create table t_kafka (
v_timestamp timestamp
);

statement error failed to fetch metadata from kafka
statement error cannot connect to kafka broker
create sink sink_non_exist_broker from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'make no sense',
Expand All @@ -19,15 +19,6 @@ create sink sink_non_exist_broker from t_kafka with (
type = 'append-only',
);

statement error topic invalid_topic not found
create sink sink_non_exist_topic from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'invalid_topic',
force_append_only = 'true',
type = 'append-only',
);

# Test create sink with connection
# Create a mock connection
statement ok
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,17 @@ impl Sink for KafkaSink {
// Try Kafka connection.
// There is no such interface for kafka producer to validate a connection
// use enumerator to validate broker reachability and existence of topic
let mut ticker = KafkaSplitEnumerator::new(
let check = KafkaSplitEnumerator::new(
KafkaProperties::from(self.config.clone()),
Arc::new(SourceEnumeratorContext::default()),
)
.await?;
_ = ticker.list_splits().await?;
if !check.check_reachability().await {
return Err(SinkError::Config(anyhow!(
"cannot connect to kafka broker ({})",
self.config.common.brokers
)));
}
Ok(())
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ impl KafkaSplitEnumerator {
.set(offset);
}

pub async fn check_reachability(&self) -> bool {
self.client
.fetch_metadata(None, self.sync_call_timeout)
.await
.is_ok()
}

async fn fetch_topic_partition(&self) -> anyhow::Result<Vec<i32>> {
// for now, we only support one topic
let metadata = self
Expand Down

0 comments on commit 9386536

Please sign in to comment.