Skip to content

Commit

Permalink
[KCON35] : Improvement : Read files with stream instead of loading it…
Browse files Browse the repository at this point in the history
… all (#351)

Currently the transformers load the files and get a list of records. This could cause performance issues for large files.

* With Stream/StreamSupport, only when next() is called from iterator, a record is transformed.
  • Loading branch information
muralibasani authored and RyanSkraba committed Nov 25, 2024
1 parent 8801bba commit a31b0ca
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 180 deletions.
1 change: 0 additions & 1 deletion s3-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ dependencies {
exclude(group = "org.apache.commons", module = "commons-math3")
exclude(group = "org.apache.httpcomponents", module = "httpclient")
exclude(group = "commons-codec", module = "commons-codec")
exclude(group = "commons-io", module = "commons-io")
exclude(group = "commons-net", module = "commons-net")
exclude(group = "org.eclipse.jetty")
exclude(group = "org.eclipse.jetty.websocket")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_POLL_RECORDS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPICS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPIC_PARTITIONS;
Expand All @@ -41,7 +39,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
Expand All @@ -68,7 +65,6 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -185,33 +181,6 @@ void bytesTest(final TestInfo testInfo) throws IOException {
verifyOffsetPositions(offsetKeys, 4);
}

@Test
void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo) throws IOException, InterruptedException {
final String testData = "AABBCCDDEE";
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 3);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectorConfig.put(EXPECTED_MAX_MESSAGE_BYTES, "2"); // For above test data of 10 bytes length, with 2 bytes
// each
// in source record, we expect 5 records.
connectorConfig.put(MAX_POLL_RECORDS, "2"); // In 3 polls all the 5 records should be processed

connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
final String offsetKey = writeToS3(topicName, testData.getBytes(StandardCharsets.UTF_8), "00000");

// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeMessages(topicName, 5, connectRunner.getBootstrapServers());

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).containsExactly("AA", "BB", "CC", "DD", "EE");

Awaitility.await().atMost(Duration.ofMinutes(2)).untilAsserted(() -> {
final Map<String, Object> offsetRecs = IntegrationBase.consumeOffsetStorageMessages(
"connect-offset-topic-" + CONNECTOR_NAME, 1, connectRunner.getBootstrapServers());
assertThat(offsetRecs).containsExactly(entry(offsetKey, 5));
});
}

@Test
void avroTest(final TestInfo testInfo) throws IOException, InterruptedException {
final var topicName = IntegrationBase.topicName(testInfo);
Expand All @@ -227,16 +196,19 @@ void avroTest(final TestInfo testInfo) throws IOException, InterruptedException
final Schema schema = parser.parse(schemaJson);

final byte[] outputStream1 = getAvroRecord(schema, 1, 100);
final byte[] outputStream2 = getAvroRecord(schema, 2, 100);
final byte[] outputStream2 = getAvroRecord(schema, 101, 100);
final byte[] outputStream3 = getAvroRecord(schema, 201, 100);
final byte[] outputStream4 = getAvroRecord(schema, 301, 100);
final byte[] outputStream5 = getAvroRecord(schema, 401, 100);

final Set<String> offsetKeys = new HashSet<>();

offsetKeys.add(writeToS3(topicName, outputStream1, "00001"));
offsetKeys.add(writeToS3(topicName, outputStream2, "00001"));

offsetKeys.add(writeToS3(topicName, outputStream1, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream3, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream4, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream5, "00002"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

Expand All @@ -249,7 +221,12 @@ void avroTest(final TestInfo testInfo) throws IOException, InterruptedException
assertThat(records).hasSize(500)
.map(record -> entry(record.get("id"), String.valueOf(record.get("message"))))
.contains(entry(1, "Hello, Kafka Connect S3 Source! object 1"),
entry(2, "Hello, Kafka Connect S3 Source! object 2"));
entry(2, "Hello, Kafka Connect S3 Source! object 2"),
entry(100, "Hello, Kafka Connect S3 Source! object 100"),
entry(200, "Hello, Kafka Connect S3 Source! object 200"),
entry(300, "Hello, Kafka Connect S3 Source! object 300"),
entry(400, "Hello, Kafka Connect S3 Source! object 400"),
entry(500, "Hello, Kafka Connect S3 Source! object 500"));

Thread.sleep(10_000);

Expand Down Expand Up @@ -327,17 +304,17 @@ void jsonTest(final TestInfo testInfo) throws IOException {
verifyOffsetPositions(offsetKeys, 1);
}

private static byte[] getAvroRecord(final Schema schema, final int messageId, final int noOfAvroRecs)
throws IOException {
private static byte[] getAvroRecord(final Schema schema, int messageId, final int noOfAvroRecs) throws IOException {
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
dataFileWriter.create(schema, outputStream);
for (int i = 0; i < noOfAvroRecs; i++) {
final GenericRecord avroRecord = new GenericData.Record(schema); // NOPMD
avroRecord.put("message", "Hello, Kafka Connect S3 Source! object " + i);
avroRecord.put("message", "Hello, Kafka Connect S3 Source! object " + messageId);
avroRecord.put("id", messageId);
dataFileWriter.append(avroRecord);
messageId++; // NOPMD
}

dataFileWriter.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,21 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.amazonaws.util.IOUtils;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,10 +52,10 @@ public void configureValueConverter(final Map<String, String> config, final S3So
}

@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
return readAvroRecords(inputStream, datumReader);
return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader);
}

@Override
Expand All @@ -58,6 +64,20 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
s3SourceConfig);
}

private Stream<Object> readAvroRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier,
final DatumReader<GenericRecord> datumReader) {
try (DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inputStreamIOSupplier.get(),
datumReader)) {
// Wrap DataFileStream in a Stream using a Spliterator for lazy processing
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFileStream, Spliterator.ORDERED | Spliterator.NONNULL),
false);
} catch (IOException e) {
LOGGER.error("Error in DataFileStream: {}", e.getMessage(), e);
return Stream.empty(); // Return an empty stream if initialization fails
}
}

List<Object> readAvroRecords(final InputStream content, final DatumReader<GenericRecord> datumReader) {
final List<Object> records = new ArrayList<>();
try (SeekableByteArrayInput sin = new SeekableByteArrayInput(IOUtils.toByteArray(content))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package io.aiven.kafka.connect.s3.source.input;

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,29 +38,31 @@ public void configureValueConverter(final Map<String, String> config, final S3So
// For byte array transformations, ByteArrayConverter is the converter which is the default config.
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {

final int maxMessageBytesSize = s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES);
final byte[] buffer = new byte[maxMessageBytesSize];
int bytesRead;
// Create a Stream that processes each chunk lazily
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
final byte[] buffer = new byte[4096];

final List<Object> chunks = new ArrayList<>();
try {
bytesRead = inputStream.read(buffer);
while (bytesRead != -1) {
final byte[] chunk = new byte[bytesRead];
System.arraycopy(buffer, 0, chunk, 0, bytesRead);
chunks.add(chunk);
bytesRead = inputStream.read(buffer);
@Override
public boolean tryAdvance(final java.util.function.Consumer<? super Object> action) {
try (InputStream inputStream = inputStreamIOSupplier.get()) {
final int bytesRead = inputStream.read(buffer);
if (bytesRead == -1) {
return false;
}
final byte[] chunk = new byte[bytesRead];
System.arraycopy(buffer, 0, chunk, 0, bytesRead);
action.accept(chunk);
return true;
} catch (IOException e) {
LOGGER.error("Error trying to advance byte stream: {}", e.getMessage(), e);
return false;
}
}
} catch (IOException e) {
LOGGER.error("Error reading from input stream: {}", e.getMessage(), e);
}

return chunks;
}, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,30 +50,9 @@ public void configureValueConverter(final Map<String, String> config, final S3So
}

@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
final List<Object> jsonNodeList = new ArrayList<>();
JsonNode jsonNode;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line = reader.readLine();
while (line != null) {
line = line.trim();
if (!line.isEmpty()) {
try {
// Parse each line as a separate JSON object
jsonNode = objectMapper.readTree(line.trim()); // Parse the current line into a JsonNode
jsonNodeList.add(jsonNode); // Add parsed JSON object to the list
} catch (IOException e) {
LOGGER.error("Error parsing JSON record from S3 input stream: {}", e.getMessage(), e);
}
}

line = reader.readLine();
}
} catch (IOException e) {
LOGGER.error("Error reading S3 object stream: {}", e.getMessage());
}
return jsonNodeList;
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
return readJsonRecordsAsStream(inputStreamIOSupplier);
}

@Override
Expand All @@ -82,4 +64,63 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
return new byte[0];
}
}

private Stream<Object> readJsonRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier) {
// Use a Stream that lazily processes each line as a JSON object
CustomSpliterator customSpliteratorParam;
try {
customSpliteratorParam = new CustomSpliterator(inputStreamIOSupplier);
} catch (IOException e) {
LOGGER.error("Error creating Json transformer CustomSpliterator: {}", e.getMessage(), e);
return Stream.empty();
}
return StreamSupport.stream(customSpliteratorParam, false).onClose(() -> {
try {
customSpliteratorParam.reader.close(); // Ensure the reader is closed after streaming
} catch (IOException e) {
LOGGER.error("Error closing BufferedReader: {}", e.getMessage(), e);
}
});
}

/*
* This CustomSpliterator class is created so that BufferedReader instantiation is not closed before the all the
* records from stream is closed. With this now, we have a onclose method declared in parent declaration.
*/
final class CustomSpliterator extends Spliterators.AbstractSpliterator<Object> {
BufferedReader reader;
String line;
CustomSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier) throws IOException {
super(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL);
reader = new BufferedReader(new InputStreamReader(inputStreamIOSupplier.get(), StandardCharsets.UTF_8));
}

@Override
public boolean tryAdvance(final java.util.function.Consumer<? super Object> action) {
try {
if (line == null) {
line = reader.readLine();
}
while (line != null) {
line = line.trim();
if (!line.isEmpty()) {
try {
final JsonNode jsonNode = objectMapper.readTree(line); // Parse the JSON
// line
action.accept(jsonNode); // Provide the parsed JSON node to the stream
} catch (IOException e) {
LOGGER.error("Error parsing JSON record: {}", e.getMessage(), e);
}
line = null; // NOPMD
return true;
}
line = reader.readLine();
}
return false; // End of file
} catch (IOException e) {
LOGGER.error("Error reading S3 object stream: {}", e.getMessage(), e);
return false;
}
}
}
}
Loading

0 comments on commit a31b0ca

Please sign in to comment.