diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 3883d3007e0..044926fe3e5 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -307,10 +307,13 @@ private Set getTopicInfo() throws ExecutionException, Interrup } log.info("Discovered topics: {}", topics); Collection partitions = - adminClient.describeTopics(topics).all().get().values().stream() + adminClient.describeTopics(topics).allTopicNames().get().values().stream() .flatMap( t -> t.partitions().stream() + .filter( + partitionInfo -> + partitionInfo.leader() != null) .map( p -> new TopicPartition(