Skip to content

Commit

Permalink
Lazy file reader iterator, review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Oct 31, 2024
1 parent 337191b commit 399c883
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_POLL_RECORDS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPICS;
Expand Down Expand Up @@ -181,8 +181,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectorConfig.put(MAX_MESSAGE_BYTES_SIZE, "2"); // For above test data of 10 bytes length, with 2 bytes each
// in source record, we expect 5 records.
connectorConfig.put(EXPECTED_MAX_MESSAGE_BYTES, "2"); // For above test data of 10 bytes length, with 2 bytes
// each
// in source record, we expect 5 records.
connectorConfig.put(MAX_POLL_RECORDS, "2"); // In 3 polls all the 5 records should be processed

connectRunner.createConnector(connectorConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ public void start(final Map<String, String> props) {
private void initializeConverters() {
try {
keyConverter = Optional
.of((Converter) s3SourceConfig.getClass("key.converter").getDeclaredConstructor().newInstance());
valueConverter = (Converter) s3SourceConfig.getClass("value.converter")
.of((Converter) Class.forName((String) s3SourceConfig.originals().get("key.converter"))
.getDeclaredConstructor()
.newInstance());
valueConverter = (Converter) Class.forName((String) s3SourceConfig.originals().get("value.converter"))
.getDeclaredConstructor()
.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | InvocationTargetException
| NoSuchMethodException e) {
throw new ConnectException("Connect converters could not be instantiated.", e);
}
Expand Down Expand Up @@ -152,7 +154,8 @@ public List<SourceRecord> poll() throws InterruptedException {

while (!connectorStopped.get()) {
try {
return extractSourceRecords(results);
LOGGER.info("Number of records sent {}", extractSourceRecords(results).size());
return results;
} catch (AmazonS3Exception | DataException exception) {
if (handleException(exception)) {
return null; // NOPMD
Expand All @@ -172,6 +175,7 @@ private boolean handleException(final RuntimeException exception) throws Interru
if (((AmazonS3Exception) exception).isRetryable()) {
LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception);
Thread.sleep(ERROR_BACKOFF);

prepareReaderFromOffsetStorageReader();
} else {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ final public class S3SourceConfig extends AbstractConfig {
public static final String TARGET_TOPICS = "topics";
public static final String FETCH_PAGE_SIZE = "aws.s3.fetch.page.size";
public static final String MAX_POLL_RECORDS = "max.poll.records";
public static final String MAX_MESSAGE_BYTES_SIZE = "max.message.bytes";
public static final String KEY_CONVERTER = "key.converter";
public static final String VALUE_CONVERTER = "value.converter";
public static final String EXPECTED_MAX_MESSAGE_BYTES = "expected.max.message.bytes";
public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3;
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";
Expand Down Expand Up @@ -164,20 +162,11 @@ private static void addOtherConfig(final S3SourceConfigDef configDef) {
ConfigDef.Importance.MEDIUM, "Max poll records", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, MAX_POLL_RECORDS);
configDef.define(KEY_CONVERTER, ConfigDef.Type.CLASS, "org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM, "Key converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, KEY_CONVERTER);
configDef.define(VALUE_CONVERTER, ConfigDef.Type.CLASS,
"org.apache.kafka.connect.converters.ByteArrayConverter", ConfigDef.Importance.MEDIUM,
"Value converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, VALUE_CONVERTER);
configDef.define(MAX_MESSAGE_BYTES_SIZE, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM,
configDef.define(EXPECTED_MAX_MESSAGE_BYTES, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM,
"The largest record batch size allowed by Kafka config max.message.bytes", GROUP_OTHER,
awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, MAX_MESSAGE_BYTES_SIZE);
ConfigDef.Width.NONE, EXPECTED_MAX_MESSAGE_BYTES);
}

private static void addAwsStsConfigGroup(final ConfigDef configDef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.s3.source.input;

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,7 +42,7 @@ public void configureValueConverter(final Map<String, String> config, final S3So
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {

final int maxMessageBytesSize = s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE);
final int maxMessageBytesSize = s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES);
final byte[] buffer = new byte[maxMessageBytesSize];
int bytesRead;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.aiven.kafka.connect.s3.source.utils;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,8 +42,9 @@ public AivenS3SourceRecord(final Map<String, Object> partitionMap, final Map<Str

this.topic = topic;
this.topicPartition = topicPartition;
this.recordKey = Arrays.copyOf(recordKey, recordKey.length);
this.recordValue = Arrays.copyOf(recordValue, recordValue.length);
this.recordKey = recordKey.clone(); // Defensive copy
this.recordValue = recordValue.clone(); // Defensive copy

this.objectKey = objectKey;
}

Expand All @@ -65,11 +65,11 @@ public Integer partition() {
}

public byte[] key() {
return recordKey.clone();
return (recordKey == null) ? null : recordKey.clone(); // Return a defensive copy
}

public byte[] value() {
return recordValue.clone();
return (recordValue == null) ? null : recordValue.clone(); // Return a defensive copy
}

public String getObjectKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.FETCH_PAGE_SIZE;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -52,37 +51,59 @@ public FileReader(final S3SourceConfig s3SourceConfig, final String bucketName,
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
Iterator<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) throws IOException {
final List<S3ObjectSummary> allSummaries = new ArrayList<>();
String continuationToken = null;
ListObjectsV2Result objectListing;
do {
// Create the request for listing objects
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR)
.withContinuationToken(continuationToken); // Set continuation token for pagination

// List objects from S3
objectListing = s3Client.listObjectsV2(request);

// Filter out zero-byte objects and add to the list
final List<S3ObjectSummary> filteredSummaries = objectListing.getObjectSummaries()
.stream()
.filter(objectSummary -> objectSummary.getSize() > 0)
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))
.collect(Collectors.toList());

allSummaries.addAll(filteredSummaries); // Add the filtered summaries to the main list

allSummaries.forEach(objSummary -> LOGGER.debug("Objects to be processed {} ", objSummary.getKey()));

// Check if there are more objects to fetch
continuationToken = objectListing.getNextContinuationToken();
} while (objectListing.isTruncated()); // Continue fetching if the result is truncated

return allSummaries.iterator();
Iterator<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) {
return new Iterator<>() {
private String continuationToken = null; // NOPMD
private List<S3ObjectSummary> currentBatch = new ArrayList<>();
private int currentIndex = 0; // NOPMD
private boolean isTruncated = true;

@Override
public boolean hasNext() {
// If there are unprocessed objects in the current batch, we return true
if (currentIndex < currentBatch.size()) {
return true;
}

if (isTruncated) {
fetchNextBatch();
return !currentBatch.isEmpty();
}

return false;
}

@Override
public S3ObjectSummary next() {
if (!hasNext()) {
return null;
}

return currentBatch.get(currentIndex++);
}

private void fetchNextBatch() {
currentBatch.clear();
currentIndex = 0;

final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR)
.withContinuationToken(continuationToken);

final ListObjectsV2Result objectListing = s3Client.listObjectsV2(request);
currentBatch = objectListing.getObjectSummaries()
.stream()
.filter(objectSummary -> objectSummary.getSize() > 0)
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))
.collect(Collectors.toList());

continuationToken = objectListing.getNextContinuationToken();
isTruncated = objectListing.isTruncated();

currentBatch.forEach(objSummary -> LOGGER.debug("Objects to be processed {} ", objSummary.getKey()));
}
};
}

public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceReco
}
}

LOGGER.info("Number of records sent {}", results.size());
return results;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -54,7 +53,7 @@ public final class SourceRecordIterator implements Iterator<AivenS3SourceRecord>
+ "(?<partitionId>\\d{5})-" + "(?<uniqueId>[a-zA-Z0-9]+)" + "\\.(?<fileExtension>[^.]+)$"); // topic-00001.txt
private String currentObjectKey;

private Iterator<S3ObjectSummary> nextFileIterator;
private final Iterator<S3ObjectSummary> s3ObjectSummaryIterator;
private Iterator<AivenS3SourceRecord> recordIterator = Collections.emptyIterator();

private final OffsetManager offsetManager;
Expand All @@ -75,23 +74,21 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final AmazonS3
this.bucketName = bucketName;
this.transformer = transformer;
this.fileReader = fileReader;
try {
nextFileIterator = fileReader.fetchObjectSummaries(s3Client);
} catch (IOException e) {
throw new AmazonClientException("Failed to initialize S3 file reader", e);
}
s3ObjectSummaryIterator = fileReader.fetchObjectSummaries(s3Client);
}

private void nextS3Object() {
if (!nextFileIterator.hasNext()) {
if (!s3ObjectSummaryIterator.hasNext()) {
recordIterator = Collections.emptyIterator();
return;
}

try {
final S3ObjectSummary file = nextFileIterator.next();
currentObjectKey = file.getKey();
recordIterator = createIteratorForCurrentFile();
final S3ObjectSummary file = s3ObjectSummaryIterator.next();
if (file != null) {
currentObjectKey = file.getKey();
recordIterator = createIteratorForCurrentFile();
}
} catch (IOException e) {
throw new AmazonClientException(e);
}
Expand Down Expand Up @@ -134,9 +131,7 @@ private Iterator<AivenS3SourceRecord> getObjectIterator(final InputStream valueI
private final Iterator<AivenS3SourceRecord> internalIterator = readNext().iterator();

private List<AivenS3SourceRecord> readNext() {

final Optional<byte[]> optionalKeyBytes = Optional.ofNullable(currentObjectKey)
.map(k -> k.getBytes(StandardCharsets.UTF_8));
final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8);
final List<AivenS3SourceRecord> sourceRecords = new ArrayList<>();

int numOfProcessedRecs = 1;
Expand All @@ -151,8 +146,8 @@ private List<AivenS3SourceRecord> readNext() {

final byte[] valueBytes = transformer.getValueBytes(record, topic, s3SourceConfig);
checkOffsetMap = false;
sourceRecords.add(getSourceRecord(optionalKeyBytes, valueBytes, topic, topicPartition,
offsetManager, startOffset, partitionMap));
sourceRecords.add(getSourceRecord(keyBytes, valueBytes, topic, topicPartition, offsetManager,
startOffset, partitionMap));
if (sourceRecords.size() >= s3SourceConfig.getInt(MAX_POLL_RECORDS)) {
break;
}
Expand All @@ -162,9 +157,9 @@ private List<AivenS3SourceRecord> readNext() {
return sourceRecords;
}

private AivenS3SourceRecord getSourceRecord(final Optional<byte[]> key, final byte[] value,
final String topic, final int topicPartition, final OffsetManager offsetManager,
final long startOffset, final Map<String, Object> partitionMap) {
private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic,
final int topicPartition, final OffsetManager offsetManager, final long startOffset,
final Map<String, Object> partitionMap) {

long currentOffset;

Expand All @@ -180,7 +175,7 @@ private AivenS3SourceRecord getSourceRecord(final Optional<byte[]> key, final by

final Map<String, Object> offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset);

return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key.orElse(null), value,
return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value,
currentObjectKey);
}

Expand All @@ -198,7 +193,7 @@ public AivenS3SourceRecord next() {

@Override
public boolean hasNext() {
return recordIterator.hasNext() || nextFileIterator.hasNext();
return recordIterator.hasNext() || s3ObjectSummaryIterator.hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.s3.source.input;

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -52,7 +52,7 @@ void testGetRecordsSingleChunk() {
final byte[] data = { 1, 2, 3, 4, 5 };
final InputStream inputStream = new ByteArrayInputStream(data);

when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(10_000); // Larger than data size
when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(10_000); // Larger than data size

final List<Object> records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig);

Expand All @@ -65,7 +65,7 @@ void testGetRecordsMultipleChunks() {
final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
final InputStream inputStream = new ByteArrayInputStream(data);

when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(5); // Smaller than data size
when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(5); // Smaller than data size

final List<Object> records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig);

Expand All @@ -78,7 +78,7 @@ void testGetRecordsMultipleChunks() {
void testGetRecordsEmptyInputStream() throws IOException {
final InputStream inputStream = new ByteArrayInputStream(new byte[] {});

when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(5);
when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(5);

final List<Object> records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig);

Expand Down

0 comments on commit 399c883

Please sign in to comment.