Skip to content

Commit

Permalink
Allow output topic to be converted to java.util.List
Browse files Browse the repository at this point in the history
  • Loading branch information
JakobEdding committed Apr 11, 2024
1 parent 66a8657 commit b1ed9cb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2019 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,8 @@

package com.bakdata.fluent_kafka_streams_tests;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.TestOutputTopic;
Expand Down Expand Up @@ -114,6 +116,16 @@ public TestOutput<K, V> asStream() {
return new StreamOutput<>(this.testDriver, this.topic, this.keySerde, this.valueSerde);
}

@Override
public List<ProducerRecord<K, V>> toList() {
return this.testOutputTopic
.readRecordsToList()
.stream()
.map(testRecord -> new ProducerRecord<>(this.topic, 0, testRecord.timestamp(), testRecord.key(),
testRecord.value(), testRecord.getHeaders()))
.collect(Collectors.toList());
}

// ==================
// Non-public methods
// ==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.fluent_kafka_streams_tests;

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;

Expand Down Expand Up @@ -147,5 +148,7 @@ default <VR> TestOutput<K, VR> withValueType(final Class<VR> valueType) {
* @return Current output with {@link org.apache.kafka.streams.kstream.KStream} semantics
*/
TestOutput<K, V> asStream();

List<ProducerRecord<K, V>> toList();
}

0 comments on commit b1ed9cb

Please sign in to comment.