Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
WesselVS committed Jun 23, 2020
1 parent 7d1be68 commit dbe6178
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ public Map<TopicPartition, Long> getEndOffsets(Collection<TopicPartition> partit
Map<String, Object> serializerConfig = new HashMap<>(consumerConfig);
serializerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
serializerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
KafkaConsumer<Byte[], Byte[]> consumer = new KafkaConsumer<>(serializerConfig);
consumer.assign(partitions);
try (KafkaConsumer<Byte[], Byte[]> consumer = new KafkaConsumer<>(serializerConfig)) {
consumer.assign(partitions);

Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
List<TopicPartition> toRemove = new ArrayList<>();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
List<TopicPartition> toRemove = new ArrayList<>();

for (Map.Entry<TopicPartition, Long> partitionOffset: offsets.entrySet()) {
if (partitionOffset.getValue() == 0L) {
toRemove.add(partitionOffset.getKey()); // don't store empty offsets
for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
if (partitionOffset.getValue() == 0L) {
toRemove.add(partitionOffset.getKey()); // don't store empty offsets
}
}

for (TopicPartition partition : toRemove) {
offsets.remove(partition);
}
}

for (TopicPartition partition: toRemove) {
offsets.remove(partition);
return offsets;
}
consumer.close();
return offsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void open(Collection<TopicPartition> partitions) {

this.partitionWriters.put(topicPartition, partitionWriter);
this.currentOffsets.put(topicPartition, lastWrittenOffset);
log.debug("last written offset " + lastWrittenOffset);
}
if ( config.snapShotMode() ) {
this.endOffsets = endOffsetReader.getEndOffsets(partitions);
Expand Down

0 comments on commit dbe6178

Please sign in to comment.