diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 966fe92bc..0f4a3bf61 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -21,8 +21,8 @@ import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_POLL_RECORDS; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPICS; @@ -181,8 +181,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo) final var topicName = IntegrationBase.topicName(testInfo); final Map connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); - connectorConfig.put(MAX_MESSAGE_BYTES_SIZE, "2"); // For above test data of 10 bytes length, with 2 bytes each - // in source record, we expect 5 records. + connectorConfig.put(EXPECTED_MAX_MESSAGE_BYTES, "2"); // For above test data of 10 bytes length, with 2 bytes + // each + // in source record, we expect 5 records. connectorConfig.put(MAX_POLL_RECORDS, "2"); // In 3 polls all the 5 records should be processed connectRunner.createConnector(connectorConfig); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 4f2e54a52..eee1b36a2 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -120,11 +120,13 @@ public void start(final Map props) { private void initializeConverters() { try { keyConverter = Optional - .of((Converter) s3SourceConfig.getClass("key.converter").getDeclaredConstructor().newInstance()); - valueConverter = (Converter) s3SourceConfig.getClass("value.converter") + .of((Converter) Class.forName((String) s3SourceConfig.originals().get("key.converter")) + .getDeclaredConstructor() + .newInstance()); + valueConverter = (Converter) Class.forName((String) s3SourceConfig.originals().get("value.converter")) .getDeclaredConstructor() .newInstance(); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new ConnectException("Connect converters could not be instantiated.", e); } @@ -152,7 +154,8 @@ public List poll() throws InterruptedException { while (!connectorStopped.get()) { try { - return extractSourceRecords(results); + LOGGER.info("Number of records sent {}", extractSourceRecords(results).size()); + return results; } catch (AmazonS3Exception | DataException exception) { if (handleException(exception)) { return null; // NOPMD @@ -172,6 +175,7 @@ private boolean handleException(final RuntimeException exception) throws Interru if (((AmazonS3Exception) exception).isRetryable()) { LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception); Thread.sleep(ERROR_BACKOFF); + prepareReaderFromOffsetStorageReader(); } else { return true; diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 1db2ba9aa..77241348e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -79,9 +79,7 @@ final public class S3SourceConfig extends AbstractConfig { public static final String TARGET_TOPICS = "topics"; public static final String FETCH_PAGE_SIZE = "aws.s3.fetch.page.size"; public static final String MAX_POLL_RECORDS = "max.poll.records"; - public static final String MAX_MESSAGE_BYTES_SIZE = "max.message.bytes"; - public static final String KEY_CONVERTER = "key.converter"; - public static final String VALUE_CONVERTER = "value.converter"; + public static final String EXPECTED_MAX_MESSAGE_BYTES = "expected.max.message.bytes"; public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; public static final String INPUT_FORMAT_KEY = "input.format"; public static final String SCHEMAS_ENABLE = "schemas.enable"; @@ -164,20 +162,11 @@ private static void addOtherConfig(final S3SourceConfigDef configDef) { ConfigDef.Importance.MEDIUM, "Max poll records", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD // UnusedAssignment ConfigDef.Width.NONE, MAX_POLL_RECORDS); - configDef.define(KEY_CONVERTER, ConfigDef.Type.CLASS, "org.apache.kafka.connect.converters.ByteArrayConverter", - ConfigDef.Importance.MEDIUM, "Key converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD - // UnusedAssignment - ConfigDef.Width.NONE, KEY_CONVERTER); - configDef.define(VALUE_CONVERTER, ConfigDef.Type.CLASS, - "org.apache.kafka.connect.converters.ByteArrayConverter", ConfigDef.Importance.MEDIUM, - "Value converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD - // UnusedAssignment - ConfigDef.Width.NONE, VALUE_CONVERTER); - configDef.define(MAX_MESSAGE_BYTES_SIZE, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM, + configDef.define(EXPECTED_MAX_MESSAGE_BYTES, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM, "The largest record batch size allowed by Kafka config max.message.bytes", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD // UnusedAssignment - ConfigDef.Width.NONE, MAX_MESSAGE_BYTES_SIZE); + ConfigDef.Width.NONE, EXPECTED_MAX_MESSAGE_BYTES); } private static void addAwsStsConfigGroup(final ConfigDef configDef) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java index 472d8b93a..bc53e6330 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.s3.source.input; -import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE; +import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES; import java.io.IOException; import java.io.InputStream; @@ -42,7 +42,7 @@ public void configureValueConverter(final Map config, final S3So public List getRecords(final InputStream inputStream, final String topic, final int topicPartition, final S3SourceConfig s3SourceConfig) { - final int maxMessageBytesSize = s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE); + final int maxMessageBytesSize = s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES); final byte[] buffer = new byte[maxMessageBytesSize]; int bytesRead; diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java index d3008fc25..87803c636 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AivenS3SourceRecord.java @@ -16,7 +16,6 @@ package io.aiven.kafka.connect.s3.source.utils; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -43,8 +42,9 @@ public AivenS3SourceRecord(final Map partitionMap, final Map fetchObjectSummaries(final AmazonS3 s3Client) throws IOException { - final List allSummaries = new ArrayList<>(); - String continuationToken = null; - ListObjectsV2Result objectListing; - do { - // Create the request for listing objects - final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR) - .withContinuationToken(continuationToken); // Set continuation token for pagination - - // List objects from S3 - objectListing = s3Client.listObjectsV2(request); - - // Filter out zero-byte objects and add to the list - final List filteredSummaries = objectListing.getObjectSummaries() - .stream() - .filter(objectSummary -> objectSummary.getSize() > 0) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())) - .collect(Collectors.toList()); - - allSummaries.addAll(filteredSummaries); // Add the filtered summaries to the main list - - allSummaries.forEach(objSummary -> LOGGER.debug("Objects to be processed {} ", objSummary.getKey())); - - // Check if there are more objects to fetch - continuationToken = objectListing.getNextContinuationToken(); - } while (objectListing.isTruncated()); // Continue fetching if the result is truncated - - return allSummaries.iterator(); + Iterator fetchObjectSummaries(final AmazonS3 s3Client) { + return new Iterator<>() { + private String continuationToken = null; // NOPMD + private List currentBatch = new ArrayList<>(); + private int currentIndex = 0; // NOPMD + private boolean isTruncated = true; + + @Override + public boolean hasNext() { + // If there are unprocessed objects in the current batch, we return true + if (currentIndex < currentBatch.size()) { + return true; + } + + if (isTruncated) { + fetchNextBatch(); + return !currentBatch.isEmpty(); + } + + return false; + } + + @Override + public S3ObjectSummary next() { + if (!hasNext()) { + return null; + } + + return currentBatch.get(currentIndex++); + } + + private void fetchNextBatch() { + currentBatch.clear(); + currentIndex = 0; + + final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) + .withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR) + .withContinuationToken(continuationToken); + + final ListObjectsV2Result objectListing = s3Client.listObjectsV2(request); + currentBatch = objectListing.getObjectSummaries() + .stream() + .filter(objectSummary -> objectSummary.getSize() > 0) + .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())) + .collect(Collectors.toList()); + + continuationToken = objectListing.getNextContinuationToken(); + isTruncated = objectListing.isTruncated(); + + currentBatch.forEach(objSummary -> LOGGER.debug("Objects to be processed {} ", objSummary.getKey())); + } + }; } - public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java index 36954b542..337870a9f 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java @@ -60,7 +60,6 @@ public static List processRecords(final Iterator + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt private String currentObjectKey; - private Iterator nextFileIterator; + private final Iterator s3ObjectSummaryIterator; private Iterator recordIterator = Collections.emptyIterator(); private final OffsetManager offsetManager; @@ -75,23 +74,21 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final AmazonS3 this.bucketName = bucketName; this.transformer = transformer; this.fileReader = fileReader; - try { - nextFileIterator = fileReader.fetchObjectSummaries(s3Client); - } catch (IOException e) { - throw new AmazonClientException("Failed to initialize S3 file reader", e); - } + s3ObjectSummaryIterator = fileReader.fetchObjectSummaries(s3Client); } private void nextS3Object() { - if (!nextFileIterator.hasNext()) { + if (!s3ObjectSummaryIterator.hasNext()) { recordIterator = Collections.emptyIterator(); return; } try { - final S3ObjectSummary file = nextFileIterator.next(); - currentObjectKey = file.getKey(); - recordIterator = createIteratorForCurrentFile(); + final S3ObjectSummary file = s3ObjectSummaryIterator.next(); + if (file != null) { + currentObjectKey = file.getKey(); + recordIterator = createIteratorForCurrentFile(); + } } catch (IOException e) { throw new AmazonClientException(e); } @@ -134,9 +131,7 @@ private Iterator getObjectIterator(final InputStream valueI private final Iterator internalIterator = readNext().iterator(); private List readNext() { - - final Optional optionalKeyBytes = Optional.ofNullable(currentObjectKey) - .map(k -> k.getBytes(StandardCharsets.UTF_8)); + final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); final List sourceRecords = new ArrayList<>(); int numOfProcessedRecs = 1; @@ -151,8 +146,8 @@ private List readNext() { final byte[] valueBytes = transformer.getValueBytes(record, topic, s3SourceConfig); checkOffsetMap = false; - sourceRecords.add(getSourceRecord(optionalKeyBytes, valueBytes, topic, topicPartition, - offsetManager, startOffset, partitionMap)); + sourceRecords.add(getSourceRecord(keyBytes, valueBytes, topic, topicPartition, offsetManager, + startOffset, partitionMap)); if (sourceRecords.size() >= s3SourceConfig.getInt(MAX_POLL_RECORDS)) { break; } @@ -162,9 +157,9 @@ private List readNext() { return sourceRecords; } - private AivenS3SourceRecord getSourceRecord(final Optional key, final byte[] value, - final String topic, final int topicPartition, final OffsetManager offsetManager, - final long startOffset, final Map partitionMap) { + private AivenS3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic, + final int topicPartition, final OffsetManager offsetManager, final long startOffset, + final Map partitionMap) { long currentOffset; @@ -180,7 +175,7 @@ private AivenS3SourceRecord getSourceRecord(final Optional key, final by final Map offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset); - return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key.orElse(null), value, + return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, currentObjectKey); } @@ -198,7 +193,7 @@ public AivenS3SourceRecord next() { @Override public boolean hasNext() { - return recordIterator.hasNext() || nextFileIterator.hasNext(); + return recordIterator.hasNext() || s3ObjectSummaryIterator.hasNext(); } @Override diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java index 4c2bb0099..db743748f 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.s3.source.input; -import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE; +import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.EXPECTED_MAX_MESSAGE_BYTES; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.when; @@ -52,7 +52,7 @@ void testGetRecordsSingleChunk() { final byte[] data = { 1, 2, 3, 4, 5 }; final InputStream inputStream = new ByteArrayInputStream(data); - when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(10_000); // Larger than data size + when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(10_000); // Larger than data size final List records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig); @@ -65,7 +65,7 @@ void testGetRecordsMultipleChunks() { final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; final InputStream inputStream = new ByteArrayInputStream(data); - when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(5); // Smaller than data size + when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(5); // Smaller than data size final List records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig); @@ -78,7 +78,7 @@ void testGetRecordsMultipleChunks() { void testGetRecordsEmptyInputStream() throws IOException { final InputStream inputStream = new ByteArrayInputStream(new byte[] {}); - when(s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE)).thenReturn(5); + when(s3SourceConfig.getInt(EXPECTED_MAX_MESSAGE_BYTES)).thenReturn(5); final List records = byteArrayTransformer.getRecords(inputStream, "test-topic", 0, s3SourceConfig);