diff --git a/commons/build.gradle.kts b/commons/build.gradle.kts index 232404466..f541be1ba 100644 --- a/commons/build.gradle.kts +++ b/commons/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { implementation(logginglibs.slf4j) implementation(apache.commons.text) + implementation(apache.commons.collection4) implementation(apache.parquet.avro) { exclude(group = "org.xerial.snappy", module = "snappy-java") diff --git a/s3-source-connector/build.gradle.kts b/s3-source-connector/build.gradle.kts index 3530724e0..24c9cc9cf 100644 --- a/s3-source-connector/build.gradle.kts +++ b/s3-source-connector/build.gradle.kts @@ -65,6 +65,7 @@ dependencies { compileOnly(apache.kafka.connect.api) compileOnly(apache.kafka.connect.runtime) + implementation(apache.commons.collection4) implementation(project(":commons")) implementation(project(":s3-commons")) implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 1689ec9fa..18ab600b1 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -18,10 +18,8 @@ import java.util.HashSet; import java.util.Iterator; -import java.util.Objects; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Stream; import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -30,6 +28,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.collections4.IteratorUtils; import org.codehaus.plexus.util.StringUtils; /** @@ -53,11 +52,7 @@ public class AWSV2SourceClient { * all objectKeys which have already been tried but have been unable to process. */ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { - this.s3SourceConfig = s3SourceConfig; - final S3ClientFactory s3ClientFactory = new S3ClientFactory(); - this.s3Client = s3ClientFactory.createAmazonS3Client(s3SourceConfig); - this.bucketName = s3SourceConfig.getAwsS3BucketName(); - this.failedObjectKeys = new HashSet<>(failedObjectKeys); + this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig, failedObjectKeys); } /** @@ -76,58 +71,86 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set this.s3Client = s3Client; this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.failedObjectKeys = new HashSet<>(failedObjectKeys); + this.filterPredicate = filterPredicate.and(new FailedObjectFilter()); } - public Iterator getListOfObjectKeys(final String startToken) { + /** + * Gets an iterator of S3Objects. Performs the filtering based on the filters provided. Always filters Objects with + * a size of 0 (zero), and objects that have been added to the failed objects list. + * + * @param startToken + * the token (key) to start from. + * @return an iterator of S3Objects. + */ + public Iterator getObjectIterator(final String startToken) { final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR); - if (StringUtils.isNotBlank(startToken)) { request.withStartAfter(startToken); } + // perform the filtering of S3ObjectSummaries + final Iterator s3ObjectSummaryIterator = IteratorUtils + .filteredIterator(new S3ObjectSummaryIterator(s3Client, request), filterPredicate::test); + // transform S3ObjectSummary to S3Object + return IteratorUtils.transformedIterator(s3ObjectSummaryIterator, + objectSummary -> getObject(objectSummary.getKey())); + } - final Stream s3ObjectKeyStream = Stream - .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { - // This is called every time next() is called on the iterator. - if (response.isTruncated()) { - return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) - .withContinuationToken(response.getNextContinuationToken())); - } else { - return null; - } - - }) - .flatMap(response -> response.getObjectSummaries() - .stream() - .filter(filterPredicate) - .filter(objectSummary -> assignObjectToTask(objectSummary.getKey())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))) - .map(S3ObjectSummary::getKey); - return s3ObjectKeyStream.iterator(); + /** + * Adds a filter by "AND"ing it to the existing filters. + * + * @param other + * the filter to add. + */ + public void andFilter(final Predicate other) { + filterPredicate = filterPredicate.and(other); + } + + /** + * Adds a filter by "OR"ing it with the existing filters. + * + * @param other + * the filter to add. + */ + public void orFilter(final Predicate other) { + filterPredicate = filterPredicate.or(other); } + /** + * Get the S3Object from the source. + * + * @param objectKey + * the object key to retrieve. + * @return the S3Object. + */ public S3Object getObject(final String objectKey) { return s3Client.getObject(bucketName, objectKey); } + /** + * Add an object key to the list of failed keys. These will be ignored during re-reads of the data stream. + * + * @param objectKey + * the key to ignore + */ public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } - public void setFilterPredicate(final Predicate predicate) { - filterPredicate = predicate; - } - - private boolean assignObjectToTask(final String objectKey) { - final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); - final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; - final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); - return taskAssignment == taskId; - } - + /** + * Shuts down the system + */ public void shutdown() { s3Client.shutdown(); } + /** + * Filter to remove objects that are in the failed object keys list. + */ + class FailedObjectFilter implements Predicate { + @Override + public boolean test(final S3ObjectSummary objectSummary) { + return !failedObjectKeys.contains(objectSummary.getKey()); + } + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java new file mode 100644 index 000000000..d453d8a78 --- /dev/null +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * Implements a ObjectSummaryIterator on an S3 bucket. Implementation reads summaries in blocks and iterates over each + * block. When block is empty a new block is retrieved and processed until no more data is available. + */ +public class S3ObjectSummaryIterator implements Iterator { + /** The client we are using */ + private final AmazonS3 s3Client; + /** The object listing from the last call to the client */ + private ListObjectsV2Result objectListing; + /** The inner iterator on the object summaries. When it is empty a new one is read from object listing. */ + private Iterator innerIterator; + + /** the ObjectRequest initially to start the iteration from later to retrieve more records */ + private final ListObjectsV2Request request; + + /** The last key seen by this process. This allows us to restart when a new file is dropped in the direcotry */ + private String lastObjectSummaryKey; + + /** + * Constructs the s3ObjectSummaryIterator based on the Amazon se client. + * + * @param s3Client + * the Amazon client to read use for access. + * @param request + * the request object that defines the starting position for the object summary retrieval. + */ + @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable AmazeonS3 and ListObjectsV2Request objects") + public S3ObjectSummaryIterator(final AmazonS3 s3Client, final ListObjectsV2Request request) { + this.s3Client = s3Client; + this.request = request; + } + + @Override + public boolean hasNext() { + // delay creating objectListing until we need it. + if (objectListing == null) { + objectListing = s3Client.listObjectsV2(request); + innerIterator = objectListing.getObjectSummaries().iterator(); + } + if (!this.innerIterator.hasNext()) { + if (objectListing.isTruncated()) { + // get the next set of data and create an iterator on it. + request.withContinuationToken(objectListing.getContinuationToken()); + objectListing = s3Client.listObjectsV2(request); + } else { + // there is no more data -- reread the bucket + request.withContinuationToken(null); + if (lastObjectSummaryKey != null) { + request.withStartAfter(lastObjectSummaryKey); + } + objectListing = s3Client.listObjectsV2(request); + } + innerIterator = objectListing.getObjectSummaries().iterator(); + } + // innerIterator is configured. Does it have more? + return innerIterator.hasNext(); + } + + @Override + public S3ObjectSummary next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final S3ObjectSummary result = innerIterator.next(); + lastObjectSummaryKey = result.getKey(); + return result; + } +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 6cab0d12f..22982213e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -16,13 +16,12 @@ package io.aiven.kafka.connect.s3.source.utils; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -31,8 +30,9 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.collections4.IteratorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,185 +42,126 @@ */ public final class SourceRecordIterator implements Iterator { private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class); - public static final String PATTERN_TOPIC_KEY = "topicName"; - public static final String PATTERN_PARTITION_KEY = "partitionId"; - public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?[^/]+?)-" - + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; - private String currentObjectKey; - - private Iterator objectListIterator; - private Iterator recordIterator = Collections.emptyIterator(); private final OffsetManager offsetManager; private final S3SourceConfig s3SourceConfig; - private final String bucketName; private final Transformer transformer; - // Once we decouple the S3Object from the Source Iterator we can change this to be the SourceApiClient - // At which point it will work for al our integrations. - private final AWSV2SourceClient sourceClient; // NOPMD + + private final FileMatcherFilter fileMatcherFilter; + + private final Iterator> iteratorIterator; + + private Iterator workingIterator; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, final Transformer transformer, final AWSV2SourceClient sourceClient) { this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; - - this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; - this.sourceClient = sourceClient; - objectListIterator = sourceClient.getListOfObjectKeys(null); + this.fileMatcherFilter = new FileMatcherFilter(); + // add the fileMatcher and task assignment filters. + sourceClient.andFilter(fileMatcherFilter.and(new TaskAssignmentFilter(s3SourceConfig))); + iteratorIterator = IteratorUtils.transformedIterator(sourceClient.getObjectIterator(null), + s3Object -> createIteratorForS3Object(s3Object)); + workingIterator = IteratorUtils.emptyIterator(); } - private void nextS3Object() { - if (!objectListIterator.hasNext()) { - // Start after the object Key we have just finished with. - objectListIterator = sourceClient.getListOfObjectKeys(currentObjectKey); - if (!objectListIterator.hasNext()) { - recordIterator = Collections.emptyIterator(); - return; - } - } + // For bytes transformation, read whole file as 1 record + private boolean checkBytesTransformation(final Transformer transformer, final long numberOfRecsAlreadyProcessed) { + return transformer instanceof ByteArrayTransformer + && numberOfRecsAlreadyProcessed == BYTES_TRANSFORMATION_NUM_OF_RECS; + } - try { - currentObjectKey = objectListIterator.next(); - if (currentObjectKey != null) { - recordIterator = createIteratorForCurrentFile(); + /** + * Creates an Iterator of S3SourceRecords from an s3Object. package private for testing + * + * @param s3Object + * the object to get the S3SourceRecords from. + * @return an Iterator of S3SourceRecords. + */ + Iterator createIteratorForS3Object(final S3Object s3Object) { + final Map partitionMap = ConnectUtils.getPartitionMap(fileMatcherFilter.getTopicName(), + fileMatcherFilter.getPartitionId(), s3SourceConfig.getAwsS3BucketName()); + final long numberOfRecsAlreadyProcessed = offsetManager.recordsProcessedForObjectKey(partitionMap, + s3Object.getKey()); + // Optimizing without reading stream again. + if (checkBytesTransformation(transformer, numberOfRecsAlreadyProcessed)) { + return IteratorUtils.emptyIterator(); + } + final long startOffset = 1L; + final byte[] keyBytes = s3Object.getKey().getBytes(StandardCharsets.UTF_8); + final List sourceRecords = new ArrayList<>(); + try (Stream recordStream = transformer.getRecords(s3Object::getObjectContent, + fileMatcherFilter.getTopicName(), fileMatcherFilter.getPartitionId(), s3SourceConfig, + numberOfRecsAlreadyProcessed)) { + final Iterator recordIterator = recordStream.iterator(); + while (recordIterator.hasNext()) { + final Object record = recordIterator.next(); + final byte[] valueBytes = transformer.getValueBytes(record, fileMatcherFilter.getTopicName(), + s3SourceConfig); + sourceRecords.add(getSourceRecord(s3Object.getKey(), keyBytes, valueBytes, offsetManager, startOffset, + partitionMap)); } - } catch (IOException e) { - throw new AmazonClientException(e); } - } - - private Iterator createIteratorForCurrentFile() throws IOException { - - final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(currentObjectKey); - String topicName; - int defaultPartitionId; - - if (fileMatcher.find()) { - // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - try (S3Object s3Object = sourceClient.getObject(currentObjectKey);) { - - topicName = fileMatcher.group(PATTERN_TOPIC_KEY); - defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - - final long defaultStartOffsetId = 1L; - final String finalTopic = topicName; - final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, - bucketName); + return sourceRecords.iterator(); + } - return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, - partitionMap); - } + /** + * Creates an S3SourceRecord. Package private for testing. + * + * @param objectKey + * the key for the object we read this record from. + * @param key + * the key for this record + * @param value + * the value for this record. + * @param offsetManager + * the offsetManager. + * @param startOffset + * the starting offset. + * @param partitionMap + * the partition map for this object. + * @return the S3SourceRecord. + */ + S3SourceRecord getSourceRecord(final String objectKey, final byte[] key, final byte[] value, + final OffsetManager offsetManager, final long startOffset, final Map partitionMap) { + + long currentOffset; + + if (offsetManager.getOffsets().containsKey(partitionMap)) { + LOGGER.info("***** offsetManager.getOffsets() ***** {}", offsetManager.getOffsets()); + currentOffset = offsetManager.incrementAndUpdateOffsetMap(partitionMap, objectKey, startOffset); } else { - LOGGER.error("File naming doesn't match to any topic. {}", currentObjectKey); - return Collections.emptyIterator(); + LOGGER.info("Into else block ..."); + currentOffset = startOffset; + offsetManager.createNewOffsetMap(partitionMap, objectKey, currentOffset); } - } - - @SuppressWarnings("PMD.CognitiveComplexity") - private Iterator getObjectIterator(final S3Object s3Object, final String topic, - final int topicPartition, final long startOffset, final Transformer transformer, - final Map partitionMap) { - return new Iterator<>() { - private final Iterator internalIterator = readNext().iterator(); - - private List readNext() { - - final List sourceRecords = new ArrayList<>(); - - final long numberOfRecsAlreadyProcessed = offsetManager.recordsProcessedForObjectKey(partitionMap, - currentObjectKey); - - // Optimizing without reading stream again. - if (checkBytesTransformation(transformer, numberOfRecsAlreadyProcessed)) { - return sourceRecords; - } - - final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); - - try (Stream recordStream = transformer.getRecords(s3Object::getObjectContent, topic, - topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { - final Iterator recordIterator = recordStream.iterator(); - while (recordIterator.hasNext()) { - final Object record = recordIterator.next(); - - final byte[] valueBytes = transformer.getValueBytes(record, topic, s3SourceConfig); - - sourceRecords.add(getSourceRecord(keyBytes, valueBytes, topic, topicPartition, offsetManager, - startOffset, partitionMap)); - - // Break if we have reached the max records per poll - if (sourceRecords.size() >= s3SourceConfig.getMaxPollRecords()) { - break; - } - } - } - - return sourceRecords; - } - - // For bytes transformation, read whole file as 1 record - private boolean checkBytesTransformation(final Transformer transformer, - final long numberOfRecsAlreadyProcessed) { - return transformer instanceof ByteArrayTransformer - && numberOfRecsAlreadyProcessed == BYTES_TRANSFORMATION_NUM_OF_RECS; - } - - private S3SourceRecord getSourceRecord(final byte[] key, final byte[] value, final String topic, - final int topicPartition, final OffsetManager offsetManager, final long startOffset, - final Map partitionMap) { - - long currentOffset; - - if (offsetManager.getOffsets().containsKey(partitionMap)) { - LOGGER.info("***** offsetManager.getOffsets() ***** {}", offsetManager.getOffsets()); - currentOffset = offsetManager.incrementAndUpdateOffsetMap(partitionMap, currentObjectKey, - startOffset); - } else { - LOGGER.info("Into else block ..."); - currentOffset = startOffset; - offsetManager.createNewOffsetMap(partitionMap, currentObjectKey, currentOffset); - } - - final Map offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset); - return new S3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key, value, currentObjectKey); - } - - @Override - public boolean hasNext() { - return internalIterator.hasNext(); - } + final Map offsetMap = offsetManager.getOffsetValueMap(objectKey, currentOffset); - @Override - public S3SourceRecord next() { - return internalIterator.next(); - } - }; + return new S3SourceRecord(partitionMap, offsetMap, fileMatcherFilter.getTopicName(), + fileMatcherFilter.getPartitionId(), key, value, objectKey); } @Override public boolean hasNext() { - return recordIterator.hasNext() || objectListIterator.hasNext(); + while (!workingIterator.hasNext() && iteratorIterator.hasNext()) { + workingIterator = iteratorIterator.next(); + } + return workingIterator.hasNext(); } @Override public S3SourceRecord next() { - if (!recordIterator.hasNext()) { - nextS3Object(); - } - - if (!recordIterator.hasNext()) { - // If there are still no records, return null or throw an exception - return null; // Or throw new NoSuchElementException(); + if (!hasNext()) { + throw new IllegalArgumentException(); } - - return recordIterator.next(); + return workingIterator.next(); } @Override @@ -228,4 +169,101 @@ public void remove() { throw new UnsupportedOperationException("This iterator is unmodifiable"); } + /** + * A filter for S3ObjectSummaries that extracts the topic name and partition id. Package private for testing. + */ + static class FileMatcherFilter implements Predicate { + private static final String PATTERN_TOPIC_KEY = "topicName"; + private static final String PATTERN_PARTITION_KEY = "partitionId"; + + private static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?[^/]+?)-" + + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt + + private static final String NOT_ASSIGNED = null; + /** The extracted topic name or null the last {@link #test(S3ObjectSummary)} returned false. */ + private String topicName; + /** The extracted partition id or -1 if the last {@link #test(S3ObjectSummary)} returned false. */ + private int partitionId = -1; + + @Override + public boolean test(final S3ObjectSummary s3ObjectSummary) { + final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(s3ObjectSummary.getKey()); + if (fileMatcher.find()) { + topicName = fileMatcher.group(PATTERN_TOPIC_KEY); + partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); + return true; + } + LOGGER.error("File naming doesn't match to any topic. {}", s3ObjectSummary.getKey()); + topicName = NOT_ASSIGNED; + partitionId = -1; + return false; + } + + /** + * Gets the extracted topic name. + * + * @return the topic name or {@code null} if the topic has not been set. + */ + public String getTopicName() { + return topicName; + } + + /** + * Gets the extracted partion Id + * + * @return the partition id or -1 if the value has not been set. + */ + public int getPartitionId() { + return partitionId; + } + } + + /** + * A filter that determines if the S3ObjectSummary belongs to this task. Package private for testing. TODO: Should + * be replaced with actual task assignment predicate. + */ + static class TaskAssignmentFilter implements Predicate { + /** The maximum number of tasks */ + final int maxTasks; + /** The task ID for this task */ + final int taskId; + + /** + * Extracts integer values from original in config. TODO create acutal properties in the configuration. + * + * @param config + * the config to extract from. + * @param key + * the key to fine. + * @param dfltValue + * the default value on error. + * @return the integer value parsed or default on error. + */ + private static int fromOriginals(final S3SourceConfig config, final String key, final int dfltValue) { + final Object obj = config.originals().get(key); + if (obj != null) { + try { + return Integer.parseInt(obj.toString()); + } catch (NumberFormatException e) { + return dfltValue; + } + } + return dfltValue; + } + /** + * Constructor. + * + * @param config + * the source config to get "tasks.max" and "task.id" values from. + */ + TaskAssignmentFilter(final S3SourceConfig config) { + maxTasks = fromOriginals(config, "tasks.max", 1); + taskId = fromOriginals(config, "task.id", 0); + } + + @Override + public boolean test(final S3ObjectSummary s3ObjectSummary) { + return taskId == Math.floorMod(s3ObjectSummary.getKey().hashCode(), maxTasks); + } + } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java new file mode 100644 index 000000000..96076300b --- /dev/null +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3ObjectsUtils.java @@ -0,0 +1,132 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.testutils; + +import static org.mockito.Mockito.when; + +import java.util.List; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +/** + * Standard utilities to create objects from S3 for testing. + */ +public final class S3ObjectsUtils { + + private S3ObjectsUtils() { + // do not instantiate. + } + + /** + * Create a ListObjectV2Result from a list of summaries and an next token. + * + * @param summaries + * the list of object summaries to create the result from. + * @param nextToken + * the next token (may be {@code null}). + * @return the ListObjectV2Result from a list of summaries and an next token. + */ + public static ListObjectsV2Result createListObjectsV2Result(final List summaries, + final String nextToken) { + final ListObjectsV2Result result = new ListObjectsV2Result() { + @Override + public List getObjectSummaries() { + return summaries; + } + }; + result.setContinuationToken(nextToken); + result.setTruncated(nextToken != null); + return result; + } + + /** + * Creates an object summary with the specified key. The summary will have a size of 1. + * + * @param bucket + * the bucket name. + * @param objectKey + * the key to create the summary for. + * @return an object summary with the specified key.. + */ + public static S3ObjectSummary createObjectSummary(final String bucket, final String objectKey) { + return createObjectSummary(1, bucket, objectKey); + } + + /** + * Create an S3ObjectSummary with the specified size and object key. + * + * @param sizeOfObject + * the size for the object summary. + * @param bucket + * the bucket name + * @param objectKey + * the key for the object summary. + * @return an S3ObjectSummary with the specified size and object key. + */ + public static S3ObjectSummary createObjectSummary(final long sizeOfObject, final String bucket, + final String objectKey) { + final S3ObjectSummary summary = new S3ObjectSummary(); + summary.setSize(sizeOfObject); + summary.setKey(objectKey); + summary.setBucketName(bucket); + return summary; + } + + /** + * Create an S3Object for a key. + * + * @param bucket + * the bucket for the object. + * @param key + * the key to create the object for + * @return the S3Object. + */ + public static S3Object createS3Object(final String bucket, final String key) { + return createS3Object(createObjectSummary(bucket, key)); + } + + /** + * Creates an S43Object from the object summary. + * + * @param summary + * the object summary. + * @return the S3Object for the summary + */ + public static S3Object createS3Object(final S3ObjectSummary summary) { + final S3Object s3Object = new S3Object(); + s3Object.setKey(summary.getKey()); + s3Object.setBucketName(summary.getBucketName()); + return s3Object; + } + + /** + * Add the result the S3Client so that it will return them. + * + * @param s3Client + * the mock S3Client to add result to. + * @param result + * the list of S3ObjectSummary objects to place in the S3Client. + */ + public static void populateS3Client(final AmazonS3 s3Client, final ListObjectsV2Result result) { + for (final S3ObjectSummary summary : result.getObjectSummaries()) { + when(s3Client.getObject(summary.getBucketName(), summary.getKey())).thenReturn(createS3Object(summary)); + } + } +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/TestingTransformer.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/TestingTransformer.java new file mode 100644 index 000000000..5153dcf24 --- /dev/null +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/TestingTransformer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.testutils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.kafka.common.config.AbstractConfig; + +import io.aiven.kafka.connect.common.source.input.Transformer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.function.IOSupplier; + +/** + * Helper class to transform bytes to the same bytes with "TESTING: " prefixed. + */ +public class TestingTransformer implements Transformer { // NOPMD not test class but a utility + @Override + public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { + config.put("TestingTransformer", "Operational"); + } + + @Override + public Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic, + final int topicPartition, final AbstractConfig sourceConfig, final long skipRecords) { + try (InputStream input = inputStreamIOSupplier.get(); + InputStreamReader reader = new InputStreamReader(input, StandardCharsets.UTF_8)) { + final List lines = IOUtils.readLines(reader); + final List result = new ArrayList<>(); + for (int i = 0; i < lines.size(); i++) { + if (i >= skipRecords) { + result.add(("TESTING: " + lines.get(i)).getBytes(StandardCharsets.UTF_8)); + } + } + return result.stream(); + } catch (IOException e) { + throw new RuntimeException(e); // NOPMD allow RuntimeExeption + } + } + + @Override + public byte[] getValueBytes(final Object record, final String topic, final AbstractConfig sourceConfig) { + return (byte[]) record; + } +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index 5b5176690..001745f0f 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -20,8 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -32,142 +30,157 @@ import java.util.Map; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; class AWSV2SourceClientTest { + private static final String BUCKET_NAME = "test-bucket"; + private AmazonS3 s3Client; private AWSV2SourceClient awsv2SourceClient; - private static Map getConfigMap(final int maxTasks, final int taskId) { + private static Map getConfigMap() { final Map configMap = new HashMap<>(); - configMap.put("tasks.max", String.valueOf(maxTasks)); - configMap.put("task.id", String.valueOf(taskId)); - - configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket"); + configMap.put(AWS_S3_BUCKET_NAME_CONFIG, BUCKET_NAME); return configMap; } - @ParameterizedTest - @CsvSource({ "3, 1" }) - void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.emptyList(), null); + @BeforeEach + public void initializeSourceClient() { + final S3SourceConfig s3SourceConfig = new S3SourceConfig(getConfigMap()); + s3Client = mock(AmazonS3.class); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + } + + @Test + void testFetchObjectSummariesWithNoObjects() { + initializeSourceClient(); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.emptyList(), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); - final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); + final Iterator summaries = awsv2SourceClient.getObjectIterator(null); assertThat(summaries).isExhausted(); } - @ParameterizedTest - @CsvSource({ "1, 0" }) - void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, final int taskId) { + @Test + void testFetchObjectSummariesWithOneObject() throws IOException { final String objectKey = "any-key"; - - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - assertThat(summaries).hasNext(); + initializeSourceClient(); + final S3ObjectSummary objectSummary = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, objectKey); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.singletonList(objectSummary), null); + S3ObjectsUtils.populateS3Client(s3Client, listObjectsV2Result); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + + final Iterator s3ObjectIterator = awsv2SourceClient.getObjectIterator(null); + + assertThat(s3ObjectIterator).hasNext(); + try (S3Object object = s3ObjectIterator.next()) { + assertThat(object.getKey()).isEqualTo(objectKey); + } + assertThat(s3ObjectIterator).isExhausted(); } - @ParameterizedTest - @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - assertThat(summaries).hasNext(); - } + @Test + void testFetchObjectSummariesFiltersOutZeroByteObject() throws IOException { + initializeSourceClient(); - @ParameterizedTest - @CsvSource({ "4, 1, key1", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", - "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); + final S3ObjectSummary zeroByteObject = S3ObjectsUtils.createObjectSummary(0, BUCKET_NAME, "key1"); + final S3ObjectSummary nonZeroByteObject1 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key2"); + final S3ObjectSummary nonZeroByteObject2 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key3"); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); - assertThat(summaries).isExhausted(); - } + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + S3ObjectsUtils.populateS3Client(s3Client, listObjectsV2Result); - @ParameterizedTest - @CsvSource({ "4, 3", "4, 0" }) - void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = getListObjectsV2Result(); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + final Iterator s3ObjectIterator = awsv2SourceClient.getObjectIterator(null); - final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key2"); + } - // assigned 1 object to taskid - assertThat(summaries).hasNext(); - assertThat(summaries.next()).isNotBlank(); - assertThat(summaries).isExhausted(); + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key3"); + } + + assertThat(s3ObjectIterator).isExhausted(); } @Test - void testFetchObjectSummariesWithPagination() throws IOException { - initializeWithTaskConfigs(4, 3); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(2, "key2"); - final List firstBatch = List.of(object1); - final List secondBatch = List.of(object2); + void testFetchObjectSummariesFiltersOutFailedObject() throws IOException { + initializeSourceClient(); - final ListObjectsV2Result firstResult = createListObjectsV2Result(firstBatch, "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(secondBatch, null); + final S3ObjectSummary zeroByteObject = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key1"); + final S3ObjectSummary nonZeroByteObject1 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key2"); + final S3ObjectSummary nonZeroByteObject2 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key3"); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + S3ObjectsUtils.populateS3Client(s3Client, listObjectsV2Result); - final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); - verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); - assertThat(summaries.next()).isNotNull(); - assertThat(summaries).isExhausted(); - } + awsv2SourceClient.addFailedObjectKeys("key2"); + final Iterator s3ObjectIterator = awsv2SourceClient.getObjectIterator(null); - private ListObjectsV2Result createListObjectsV2Result(final List summaries, - final String nextToken) { - final ListObjectsV2Result result = mock(ListObjectsV2Result.class); - when(result.getObjectSummaries()).thenReturn(summaries); - when(result.getNextContinuationToken()).thenReturn(nextToken); - when(result.isTruncated()).thenReturn(nextToken != null); - return result; - } + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key1"); + } + + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key3"); + } - private S3ObjectSummary createObjectSummary(final long sizeOfObject, final String objectKey) { - final S3ObjectSummary summary = mock(S3ObjectSummary.class); - when(summary.getSize()).thenReturn(sizeOfObject); - when(summary.getKey()).thenReturn(objectKey); - return summary; + assertThat(s3ObjectIterator).isExhausted(); } - private Iterator getS3ObjectKeysIterator(final String objectKey) { - final S3ObjectSummary objectSummary = createObjectSummary(1, objectKey); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( - Collections.singletonList(objectSummary), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + @Test + void testFetchObjectSummariesWithPagination() throws IOException { + initializeSourceClient(); + final S3ObjectSummary object1 = S3ObjectsUtils.createObjectSummary(1, BUCKET_NAME, "key1"); + final S3ObjectSummary object2 = S3ObjectsUtils.createObjectSummary(2, BUCKET_NAME, "key2"); + final List firstBatch = List.of(object1); + final List secondBatch = List.of(object2); - return awsv2SourceClient.getListOfObjectKeys(null); - } + final ListObjectsV2Result firstResult = S3ObjectsUtils.createListObjectsV2Result(firstBatch, "nextToken"); + final ListObjectsV2Result secondResult = S3ObjectsUtils.createListObjectsV2Result(secondBatch, null); - public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { - final Map configMap = getConfigMap(maxTasks, taskId); - final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult) + .thenReturn(secondResult) + .thenReturn(new ListObjectsV2Result()); + S3ObjectsUtils.populateS3Client(s3Client, firstResult); + S3ObjectsUtils.populateS3Client(s3Client, secondResult); - } + final Iterator s3ObjectIterator = awsv2SourceClient.getObjectIterator(null); + + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key1"); + } - private ListObjectsV2Result getListObjectsV2Result() { - final S3ObjectSummary zeroByteObject = createObjectSummary(0, "key1"); - final S3ObjectSummary nonZeroByteObject1 = createObjectSummary(1, "key2"); - final S3ObjectSummary nonZeroByteObject2 = createObjectSummary(1, "key3"); - return createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); + assertThat(s3ObjectIterator).hasNext(); + try (S3Object s3Object = s3ObjectIterator.next()) { + assertThat(s3Object.getKey()).isEqualTo("key2"); + } + + assertThat(s3ObjectIterator).isExhausted(); } + } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIteratorTest.java new file mode 100644 index 000000000..5ab38a37c --- /dev/null +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIteratorTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class S3ObjectSummaryIteratorTest { + + private static final String BUCKET_NAME = "test-bucket"; + + private AmazonS3 s3Client; + + @BeforeEach + public void initializeSourceClient() { + s3Client = mock(AmazonS3.class); + } + + @Test + void testNoSummaries() { + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(new ListObjectsV2Result()); + final S3ObjectSummaryIterator underTest = new S3ObjectSummaryIterator(s3Client, new ListObjectsV2Request()); + assertThat(underTest).isExhausted(); + } + + @Test + void testOneSummary() { + final String objectKey = "any-key"; + initializeSourceClient(); + final S3ObjectSummary objectSummary = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, objectKey); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.singletonList(objectSummary), null); + S3ObjectsUtils.populateS3Client(s3Client, listObjectsV2Result); + + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + + final S3ObjectSummaryIterator underTest = new S3ObjectSummaryIterator(s3Client, new ListObjectsV2Request()); + + assertThat(underTest).hasNext(); + final S3ObjectSummary summary = underTest.next(); + assertThat(summary.getKey()).isEqualTo(objectKey); + + assertThat(underTest).isExhausted(); + } + + @Test + void testSummariesWithPagination() throws IOException { + initializeSourceClient(); + final S3ObjectSummary object1 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key1"); + final S3ObjectSummary object2 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, "key2"); + final List firstBatch = List.of(object1); + final List secondBatch = List.of(object2); + + final ListObjectsV2Result firstResult = S3ObjectsUtils.createListObjectsV2Result(firstBatch, "nextToken"); + final ListObjectsV2Result secondResult = S3ObjectsUtils.createListObjectsV2Result(secondBatch, null); + + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult) + .thenReturn(secondResult) + .thenReturn(new ListObjectsV2Result()); + S3ObjectsUtils.populateS3Client(s3Client, firstResult); + S3ObjectsUtils.populateS3Client(s3Client, secondResult); + + final S3ObjectSummaryIterator underTest = new S3ObjectSummaryIterator(s3Client, new ListObjectsV2Request()); + + assertThat(underTest).hasNext(); + S3ObjectSummary summary = underTest.next(); + assertThat(summary.getKey()).isEqualTo("key1"); + + assertThat(underTest).hasNext(); + summary = underTest.next(); + assertThat(summary.getKey()).isEqualTo("key2"); + + assertThat(underTest).isExhausted(); + } +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index d73068bfd..8480a0c3c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,135 +16,268 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; -import java.util.stream.Stream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import io.aiven.kafka.connect.common.source.input.AvroTransformer; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; +import io.aiven.kafka.connect.s3.source.testutils.S3ObjectsUtils; +import io.aiven.kafka.connect.s3.source.testutils.TestingTransformer; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import org.junit.jupiter.api.BeforeEach; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.http.client.methods.HttpRequestBase; import org.junit.jupiter.api.Test; final class SourceRecordIteratorTest { - private S3SourceConfig mockConfig; - private OffsetManager mockOffsetManager; - private Transformer mockTransformer; + private final static String BUCKET_NAME = "record-test-bucket"; - private AWSV2SourceClient mockSourceApiClient; + private S3SourceConfig s3SourceConfig; + private OffsetManager offsetManager; + private Transformer transformer; - @BeforeEach - public void setUp() { - mockConfig = mock(S3SourceConfig.class); - mockOffsetManager = mock(OffsetManager.class); - mockTransformer = mock(Transformer.class); - mockSourceApiClient = mock(AWSV2SourceClient.class); + private AmazonS3 s3Client; + + private AWSV2SourceClient sourceClient; + + private Map standardConfigurationData() { + return Map.of(AWS_S3_BUCKET_NAME_CONFIG, BUCKET_NAME, "tasks.max", "1", "task.id", "0"); + } + + private S3Object addInputStream(final S3Object s3Object, final byte[] data) { + final S3ObjectInputStream stream = new S3ObjectInputStream(new ByteArrayInputStream(data), + mock(HttpRequestBase.class), false); + s3Object.setObjectContent(stream); + return s3Object; } @Test - void testIteratorProcessesS3Objects() throws Exception { + void testSingleS3Object() throws IOException { + s3SourceConfig = new S3SourceConfig(standardConfigurationData()); + offsetManager = mock(OffsetManager.class); + when(offsetManager.recordsProcessedForObjectKey(anyMap(), anyString())).thenReturn(0L); + transformer = new TestingTransformer(); + s3Client = mock(AmazonS3.class); final String key = "topic-00001-abc123.txt"; + final S3ObjectSummary summary = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, key); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.singletonList(summary), null); + // create S3Object with content. + try (S3Object s3Object = addInputStream(S3ObjectsUtils.createS3Object(summary), + "Hello World".getBytes(StandardCharsets.UTF_8))) { + when(s3Client.getObject(summary.getBucketName(), summary.getKey())).thenReturn(s3Object); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + sourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + + final SourceRecordIterator underTest = new SourceRecordIterator(s3SourceConfig, offsetManager, transformer, + sourceClient); + + assertThat(underTest).hasNext(); + final S3SourceRecord record = underTest.next(); + assertThat(record.getObjectKey()).isEqualTo(key); + assertThat(record.key()).isEqualTo(key.getBytes(StandardCharsets.UTF_8)); + assertThat(record.value()).isEqualTo("TESTING: Hello World".getBytes(StandardCharsets.UTF_8)); + assertThat(record.getTopic()).isEqualTo("topic"); + assertThat(record.partition()).isEqualTo(1); + + assertThat(underTest).isExhausted(); + } + } - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + @Test + void testMultiple3Object() throws IOException { + s3SourceConfig = new S3SourceConfig(standardConfigurationData()); + offsetManager = mock(OffsetManager.class); + when(offsetManager.recordsProcessedForObjectKey(anyMap(), anyString())).thenReturn(0L); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Stream.of(new Object())); + transformer = new TestingTransformer(); + s3Client = mock(AmazonS3.class); - final String outStr = "this is a test"; - when(mockTransformer.getValueBytes(any(), anyString(), any())) - .thenReturn(outStr.getBytes(StandardCharsets.UTF_8)); + final List summaries = new ArrayList<>(); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final String key1 = "topic-00001-abc123.txt"; + final S3ObjectSummary summary1 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, key1); + summaries.add(summary1); - when(mockSourceApiClient.getListOfObjectKeys(any())).thenReturn(Collections.emptyIterator()); - SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient); + final String key2 = "topic-00002-abc123.txt"; + final S3ObjectSummary summary2 = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, key2); + summaries.add(summary2); - assertThat(iterator.hasNext()).isFalse(); - assertThat(iterator.next()).isNull(); + try (S3Object s3Object1 = addInputStream(S3ObjectsUtils.createS3Object(summary1), + "Hello World1".getBytes(StandardCharsets.UTF_8)); + S3Object s3Object2 = addInputStream(S3ObjectsUtils.createS3Object(summary2), + "Hello World2".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); + when(s3Client.getObject(summary1.getBucketName(), summary1.getKey())).thenReturn(s3Object1); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); + when(s3Client.getObject(summary2.getBucketName(), summary2.getKey())).thenReturn(s3Object2); - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils.createListObjectsV2Result(summaries, null); + + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + sourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + + final SourceRecordIterator underTest = new SourceRecordIterator(s3SourceConfig, offsetManager, transformer, + sourceClient); + + assertThat(underTest).hasNext(); + S3SourceRecord record = underTest.next(); + assertThat(record.getObjectKey()).isEqualTo(key1); + assertThat(record.key()).isEqualTo(key1.getBytes(StandardCharsets.UTF_8)); + assertThat(record.value()).isEqualTo("TESTING: Hello World1".getBytes(StandardCharsets.UTF_8)); + assertThat(record.getTopic()).isEqualTo("topic"); + assertThat(record.partition()).isEqualTo(1); + + assertThat(underTest).hasNext(); + record = underTest.next(); + assertThat(record.getObjectKey()).isEqualTo(key2); + assertThat(record.key()).isEqualTo(key2.getBytes(StandardCharsets.UTF_8)); + assertThat(record.value()).isEqualTo("TESTING: Hello World2".getBytes(StandardCharsets.UTF_8)); + assertThat(record.getTopic()).isEqualTo("topic"); + assertThat(record.partition()).isEqualTo(2); + + assertThat(underTest).isExhausted(); } } @Test - void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { - - final String key = "topic-00001-abc123.txt"; + void testByteArrayProcessorSkipsProcessedRecords() throws Exception { - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + s3SourceConfig = new S3SourceConfig(standardConfigurationData()); + offsetManager = mock(OffsetManager.class); + when(offsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) + .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - // With ByteArrayTransformer - mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Stream.of(new Object())); + transformer = mock(ByteArrayTransformer.class); + s3Client = mock(AmazonS3.class); - final String outStr = "this is a test"; + final String key = "topic-00001-abc123.txt"; + final S3ObjectSummary summary = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, key); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.singletonList(summary), null); + // create S3Object with content. + try (S3Object s3Object = addInputStream(S3ObjectsUtils.createS3Object(summary), + "Hello World".getBytes(StandardCharsets.UTF_8))) { + when(s3Client.getObject(summary.getBucketName(), summary.getKey())).thenReturn(s3Object); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + sourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + + final SourceRecordIterator underTest = new SourceRecordIterator(s3SourceConfig, offsetManager, transformer, + sourceClient); + + assertThat(underTest).isExhausted(); + verify(transformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + } + } - when(mockTransformer.getValueBytes(any(), anyString(), any())) - .thenReturn(outStr.getBytes(StandardCharsets.UTF_8)); + @Test + void testMultipleRecordSkipRecords() throws Exception { - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + s3SourceConfig = new S3SourceConfig(standardConfigurationData()); + offsetManager = mock(OffsetManager.class); + when(offsetManager.recordsProcessedForObjectKey(anyMap(), anyString())).thenReturn(1L); - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + transformer = new TestingTransformer(); + s3Client = mock(AmazonS3.class); - SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient); - assertThat(iterator.hasNext()).isTrue(); - iterator.next(); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + final String key = "topic-00001-abc123.txt"; + final S3ObjectSummary summary = S3ObjectsUtils.createObjectSummary(BUCKET_NAME, key); + final ListObjectsV2Result listObjectsV2Result = S3ObjectsUtils + .createListObjectsV2Result(Collections.singletonList(summary), null); + // create S3Object with content. + try (S3Object s3Object = addInputStream(S3ObjectsUtils.createS3Object(summary), + String.format("Hello World%nIt is nice to see you%nGoodbye cruel world") + .getBytes(StandardCharsets.UTF_8))) { + when(s3Client.getObject(summary.getBucketName(), summary.getKey())).thenReturn(s3Object); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result) + .thenReturn(new ListObjectsV2Result()); + sourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + + final SourceRecordIterator underTest = new SourceRecordIterator(s3SourceConfig, offsetManager, transformer, + sourceClient); + + assertThat(underTest).hasNext(); + S3SourceRecord record = underTest.next(); + assertThat(record.getObjectKey()).isEqualTo(key); + assertThat(record.key()).isEqualTo(key.getBytes(StandardCharsets.UTF_8)); + assertThat(record.value()).isEqualTo("TESTING: It is nice to see you".getBytes(StandardCharsets.UTF_8)); + assertThat(record.getTopic()).isEqualTo("topic"); + assertThat(record.partition()).isEqualTo(1); + + assertThat(underTest).hasNext(); + record = underTest.next(); + assertThat(record.getObjectKey()).isEqualTo(key); + assertThat(record.key()).isEqualTo(key.getBytes(StandardCharsets.UTF_8)); + assertThat(record.value()).isEqualTo("TESTING: Goodbye cruel world".getBytes(StandardCharsets.UTF_8)); + assertThat(record.getTopic()).isEqualTo("topic"); + assertThat(record.partition()).isEqualTo(1); + + assertThat(underTest).isExhausted(); + } + } - // With AvroTransformer - mockTransformer = mock(AvroTransformer.class); - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + @Test + void fileNameMatcherFilterTest() { + final SourceRecordIterator.FileMatcherFilter underTest = new SourceRecordIterator.FileMatcherFilter(); + assertThat(underTest.getPartitionId()).isEqualTo(-1); + assertThat(underTest.getTopicName()).isNull(); + + assertThat(underTest.test(createObjectSummary("topic-00001-abc123.txt"))).isTrue(); + assertThat(underTest.getPartitionId()).isEqualTo(1); + assertThat(underTest.getTopicName()).isEqualTo("topic"); + + assertThat(underTest.test(createObjectSummary("invalidFileName"))).isFalse(); + assertThat(underTest.getPartitionId()).isEqualTo(-1); + assertThat(underTest.getTopicName()).isNull(); + } - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); - assertThat(iterator.hasNext()).isTrue(); - iterator.next(); + @Test + void taskAssignmentFilterTest() { + final Map configMap = new HashMap<>(); + configMap.put("tasks.max", "4"); + configMap.put("task.id", "2"); + configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "bucket"); + final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); + final SourceRecordIterator.TaskAssignmentFilter underTest = new SourceRecordIterator.TaskAssignmentFilter( + s3SourceConfig); + + // key1 with tasks.max = 4 will return task 2. + assertThat(underTest.test(createObjectSummary("key1"))).isTrue(); + assertThat(underTest.test(createObjectSummary("key2"))).isFalse(); + } - verify(mockTransformer, times(1)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } + private S3ObjectSummary createObjectSummary(final String objectKey) { + final S3ObjectSummary summary = new S3ObjectSummary(); + summary.setKey(objectKey); + return summary; } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 21aca87b9..a4451cb5e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -6,6 +6,7 @@ val avroConverterVersion by extra("7.2.2") val avroDataVersion by extra("7.2.2") val awaitilityVersion by extra("4.2.1") val commonsTextVersion by extra("1.11.0") +val commonsCollections4Version by extra("4.4") val hadoopVersion by extra("3.4.0") val hamcrestVersion by extra("2.2") val jacksonVersion by extra("2.15.3") @@ -30,6 +31,9 @@ dependencyResolutionManagement { create("apache") { library("avro", "org.apache.avro:avro:$avroVersion") library("commons-text", "org.apache.commons:commons-text:$commonsTextVersion") + library( + "commons-collection4", + "org.apache.commons:commons-collections4:$commonsCollections4Version") library("kafka-connect-api", "org.apache.kafka:connect-api:$kafkaVersion") library("kafka-connect-json", "org.apache.kafka:connect-json:$kafkaVersion") library("kafka-connect-runtime", "org.apache.kafka:connect-runtime:$kafkaVersion")