diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java index f55257f46..c8baa95b1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java @@ -163,7 +163,7 @@ private boolean tryAdd(final List results, final Iterator> { + /** The logger to write to */ + private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); + + /** + * The local manager data. + */ + private final ConcurrentMap, Map> offsets; + + /** + * The context in which this is running. + */ + private final SourceTaskContext context; + + /** + * Constructor + * + * @param context + * the context for this instance to use. + */ + public OffsetManager(final SourceTaskContext context) { + this(context, new ConcurrentHashMap<>()); + } + + /** + * Package private for testing. + * + * @param context + * the context for this instance to use. + * @param offsets + * the offsets + */ + protected OffsetManager(final SourceTaskContext context, + final ConcurrentMap, Map> offsets) { + this.context = context; + this.offsets = offsets; + } + + /** + * Get an entry from the offset manager. This method will return the local copy if it has been created otherwise + * will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional is + * returned + * + * @param key + * the key for the entry. + * @param creator + * a function to create the connector defined offset entry from a Map of string to object. + * @return the entry. + */ + public Optional getEntry(final OffsetManagerKey key, final Function, E> creator) { + LOGGER.debug("getEntry: {}", key.getPartitionMap()); + final Map data = offsets.compute(key.getPartitionMap(), (k, v) -> { + if (v == null) { + final Map kafkaData = context.offsetStorageReader().offset(key.getPartitionMap()); + LOGGER.debug("Context stored offset map {}", kafkaData); + return kafkaData == null || kafkaData.isEmpty() ? null : kafkaData; + } else { + LOGGER.debug("Previously stored offset map {}", v); + return v; + } + }); + return data == null ? Optional.empty() : Optional.of(creator.apply(data)); + } + + /** + * Copies the entry into the offset manager data. + * + * @param entry + * the entry to update. + */ + public void updateCurrentOffsets(final E entry) { + LOGGER.debug("Updating current offsets: {}", entry.getManagerKey().getPartitionMap()); + offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> { + if (v == null) { + return new HashMap<>(entry.getProperties()); + } else { + v.putAll(entry.getProperties()); + return v; + } + }); + } + + /** + * Removes the specified entry from the in memory table. Does not impact the records stored in the + * {@link SourceTaskContext}. + * + * @param key + * the key for the entry to remove. + */ + public void removeEntry(final OffsetManagerKey key) { + LOGGER.debug("Removing: {}", key.getPartitionMap()); + offsets.remove(key.getPartitionMap()); + } + + /** + * Removes the specified entry from the in memory table. Does not impact the records stored in the + * {@link SourceTaskContext}. + * + * @param sourceRecord + * the SourceRecord that contains the key to be removed. + */ + public void removeEntry(final SourceRecord sourceRecord) { + LOGGER.debug("Removing: {}", sourceRecord.sourcePartition()); + offsets.remove(sourceRecord.sourcePartition()); + } + + /** + * The definition of an entry in the OffsetManager. + */ + public interface OffsetManagerEntry> extends Comparable { + + /** + * Creates a new OffsetManagerEntry by wrapping the properties with the current implementation. This method may + * throw a RuntimeException if requried properties are not defined in the map. + * + * @param properties + * the properties to wrap. May be {@code null}. + * @return an OffsetManagerProperty + */ + T fromProperties(Map properties); + + /** + * Extracts the data from the entry in the correct format to return to Kafka. + * + * @return the properties in a format to return to Kafka. + */ + Map getProperties(); + + /** + * Gets the value of the named property. The value returned from a {@code null} key is implementation dependant. + * + * @param key + * the property to retrieve. + * @return the value associated with the property or @{code null} if not set. + * @throws NullPointerException + * if a {@code null} key is not supported. + */ + Object getProperty(String key); + + /** + * Sets a key/value pair. Will overwrite any existing value. Implementations of OffsetManagerEntry may declare + * specific keys as restricted. These are generally keys that are managed internally by the OffsetManagerEntry + * and may not be set except through provided setter methods or the constructor. + * + * @param key + * the key to set. + * @param value + * the value to set. + * @throws IllegalArgumentException + * if the key is restricted. + */ + void setProperty(String key, Object value); + + /** + * Gets the value of the named property as an {@code int}. + * + * @param key + * the property to retrieve. + * @return the value associated with the property or @{code null} if not set. + * @throws NullPointerException + * if a {@code null} key is not supported. + */ + default int getInt(final String key) { + return ((Number) getProperty(key)).intValue(); + } + + /** + * Gets the value of the named property as a {@code long} + * + * @param key + * the property to retrieve. + * @return the value associated with the property or @{code null} if not set. + * @throws NullPointerException + * if a {@code null} key is not supported. + */ + default long getLong(final String key) { + return ((Number) getProperty(key)).longValue(); + } + + /** + * Gets the value of the named property as a String. + * + * @param key + * the property to retrieve. + * @return the value associated with the property or @{code null} if not set. + * @throws NullPointerException + * if a {@code null} key is not supported. + */ + default String getString(final String key) { + return getProperty(key).toString(); + } + + /** + * ManagerKey getManagerKey + * + * @return The offset manager key for this entry. + */ + OffsetManagerKey getManagerKey(); + + /** + * Gets the Kafka topic for this entry. + * + * @return The Kafka topic for this entry. + */ + String getTopic(); + + /** + * Gets the Kafka partition for this entry. + * + * @return The Kafka partition for this entry. + */ + int getPartition(); + + /** + * Increments the record count. + */ + void incrementRecordCount(); + } + + /** + * The OffsetManager Key. Must override hashCode() and equals(). + */ + @FunctionalInterface + public interface OffsetManagerKey { + /** + * gets the partition map used by Kafka to identify this Offset entry. + * + * @return The partition map used by Kafka to identify this Offset entry. + */ + Map getPartitionMap(); + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index 546c0c4c4..ff4eebdf9 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.common.source.input.utils; import java.util.Optional; +import java.util.OptionalInt; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,14 +67,15 @@ public static Optional getTopic(final Pattern filePattern, final String return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY)); } - public static Optional getPartitionId(final Pattern filePattern, final String sourceName) { - return matchPattern(filePattern, sourceName).flatMap(matcher -> { + public static OptionalInt getPartitionId(final Pattern filePattern, final String sourceName) { + Optional result = matchPattern(filePattern, sourceName).flatMap(matcher -> { try { return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); } catch (NumberFormatException e) { return Optional.empty(); } }); + return result.isPresent() ? OptionalInt.of(result.get()) : OptionalInt.empty(); } private static Optional matchPattern(final Pattern filePattern, final String sourceName) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java index 25f22dfc0..e8951111a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.common.source.task; import java.util.Optional; +import java.util.OptionalInt; import java.util.regex.Pattern; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -52,13 +53,13 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); return false; } - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, + final OptionalInt optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, sourceNameToBeEvaluated); if (optionalPartitionId.isPresent()) { - return optionalPartitionId.get() < maxTasks - ? taskMatchesPartition(taskId, optionalPartitionId.get()) - : taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks); + return optionalPartitionId.getAsInt() < maxTasks + ? taskMatchesPartition(taskId, optionalPartitionId.getAsInt()) + : taskMatchesPartition(taskId, optionalPartitionId.getAsInt() % maxTasks); } LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated); return false; diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java index 9b3a581eb..ce93f05fd 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java @@ -138,8 +138,8 @@ public AbstractSourceTask.AbortTrigger getAbortTrigger() { assertThat(backoff.estimatedDelay()).isEqualTo(expected); backoff.delay(); expected *= 2; + assertThat(abortTrigger).isFalse(); } assertThat(backoff.estimatedDelay()).isEqualTo(maxDelay); - assertThat(abortTrigger).isFalse(); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java new file mode 100644 index 000000000..c05ac73e4 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/OffsetManagerTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import com.google.common.base.Objects; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +final class OffsetManagerTest { + + private OffsetStorageReader offsetStorageReader; + + private OffsetManager offsetManager; + + @BeforeEach + void setup() { + offsetStorageReader = mock(OffsetStorageReader.class); + final SourceTaskContext sourceTaskContext = mock(SourceTaskContext.class); + when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); + offsetManager = new OffsetManager<>(sourceTaskContext); + } + + @Test + void testNewEntryWithDataFromContext() { + final Map partitionKey = new HashMap<>(); + partitionKey.put("segment1", "topic1"); + partitionKey.put("segment2", "a value"); + partitionKey.put("segment3", "something else"); + final Map offsetValue = new HashMap<>(partitionKey); + offsetValue.put("object_key_file", 5L); + when(offsetStorageReader.offset(partitionKey)).thenReturn(offsetValue); + + final Optional result = offsetManager.getEntry(() -> partitionKey, + TestingOffsetManagerEntry::new); + assertThat(result).isPresent(); + assertThat(result.get().data).isEqualTo(offsetValue); + } + + @Test + void testNewEntryWithoutDataFromContext() { + final Map partitionKey = new HashMap<>(); + partitionKey.put("segment1", "topic1"); + partitionKey.put("segment2", "a value"); + partitionKey.put("segment3", "something else"); + when(offsetStorageReader.offset(partitionKey)).thenReturn(new HashMap<>()); + + final Optional result = offsetManager.getEntry(() -> partitionKey, + TestingOffsetManagerEntry::new); + assertThat(result).isNotPresent(); + } + + @SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails. + public static class TestingOffsetManagerEntry // NOPMD the above suppress warnings does not work. + implements + OffsetManager.OffsetManagerEntry { + public Map data; + + public int recordCount; + + public TestingOffsetManagerEntry(final String one, final String two, final String three) { + this(); + data.put("segment1", one); + data.put("segment2", two); + data.put("segment3", three); + } + + public TestingOffsetManagerEntry() { + data = new HashMap<>(); + data.put("segment1", "The First Segment"); + data.put("segment2", "The Second Segment"); + data.put("segment3", "The Third Segment"); + } + + public TestingOffsetManagerEntry(final Map properties) { + this(); + data.putAll(properties); + } + + @Override + public TestingOffsetManagerEntry fromProperties(final Map properties) { + return new TestingOffsetManagerEntry(properties); + } + + @Override + public Map getProperties() { + return data; + } + + @Override + public Object getProperty(final String key) { + return data.get(key); + } + + @Override + public void setProperty(final String key, final Object value) { + data.put(key, value); + } + + @Override + public OffsetManager.OffsetManagerKey getManagerKey() { + return () -> Map.of("segment1", data.get("segment1"), "segment2", data.get("segment2"), "segment3", + data.get("segment3")); + } + + @Override + public String getTopic() { + return getProperty("topic").toString(); + } + + @Override + public int getPartition() { + final Object value = getProperty("partition"); + return value instanceof Integer ? (Integer) value : 0; + } + + @Override + public void incrementRecordCount() { + recordCount++; + } + + @Override + public boolean equals(final Object other) { + if (other instanceof TestingOffsetManagerEntry) { + return this.compareTo((TestingOffsetManagerEntry) other) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(getProperty("segment1"), getProperty("segment2"), getProperty("segment3")); + } + + @Override + public int compareTo(final TestingOffsetManagerEntry other) { + if (other == this) { // NOPMD + return 0; + } + int result = ((String) getProperty("segment1")).compareTo((String) other.getProperty("segment1")); + if (result == 0) { + result = ((String) getProperty("segment2")).compareTo((String) other.getProperty("segment2")); + if (result == 0) { + result = ((String) getProperty("segment3")).compareTo((String) other.getProperty("segment3")); + } + } + return result; + } + } +} diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 5d95d6ebd..13c085f5e 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -23,9 +23,8 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -33,6 +32,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.TransformerFactory; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -51,12 +53,13 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; -import io.aiven.kafka.connect.s3.source.utils.OffsetManager; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -74,16 +77,28 @@ class AwsIntegrationTest implements IntegrationBase { @Container public static final LocalStackContainer LOCALSTACK = IntegrationBase.createS3Container(); + private static String s3Prefix; + private S3Client s3Client; private String s3Endpoint; private BucketAccessor testBucketAccessor; + @Override + public String getS3Prefix() { + return s3Prefix; + } + @Override public S3Client getS3Client() { return s3Client; } + @BeforeAll + static void setUpAll() { + s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; + } + @BeforeEach void setupAWS() { s3Client = IntegrationBase.createS3Client(LOCALSTACK); @@ -104,6 +119,7 @@ private Map getConfig(final String topics, final int maxTasks) { config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY); config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); + config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix()); config.put(TARGET_TOPIC_PARTITIONS, "0,1"); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); @@ -120,8 +136,8 @@ private Map getConfig(final String topics, final int maxTasks) { */ @Test void sourceRecordIteratorBytesTest(final TestInfo testInfo) { - final var topicName = IntegrationBase.topicName(testInfo); - final Map configData = getConfig(topicName, 1); + final var topic = IntegrationBase.getTopic(testInfo); + final Map configData = getConfig(topic, 1); configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); @@ -131,14 +147,14 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final List offsetKeys = new ArrayList<>(); final List expectedKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0")); - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1")); + expectedKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00000")); + expectedKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00000")); + expectedKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00001")); + expectedKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00001")); // we don't expext the empty one. offsetKeys.addAll(expectedKeys); - offsetKeys.add(writeToS3(topicName, new byte[0], "3")); + offsetKeys.add(writeToS3(topic, new byte[0], "00003")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -148,18 +164,18 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { when(context.offsetStorageReader()).thenReturn(offsetStorageReader); when(offsetStorageReader.offsets(any())).thenReturn(new HashMap<>()); - final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig); + final OffsetManager offsetManager = new OffsetManager<>(context); final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + "{{topic}}-{{partition}}-{{start_offset}}", 0); final HashSet seenKeys = new HashSet<>(); while (sourceRecordIterator.hasNext()) { final S3SourceRecord s3SourceRecord = sourceRecordIterator.next(); - final String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey(); + final String key = s3SourceRecord.getObjectKey(); assertThat(offsetKeys).contains(key); seenKeys.add(key); } @@ -168,11 +184,11 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { @Test void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { - final var topicName = IntegrationBase.topicName(testInfo); + final var topic = IntegrationBase.getTopic(testInfo); final int maxTasks = 1; final int taskId = 0; - final Map configData = getConfig(topicName, maxTasks); + final Map configData = getConfig(topic, maxTasks); configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -199,12 +215,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - offsetKeys.add(writeToS3(topicName, outputStream1, "1")); - offsetKeys.add(writeToS3(topicName, outputStream2, "1")); + offsetKeys.add(writeToS3(topic, outputStream1, "00001")); + offsetKeys.add(writeToS3(topic, outputStream2, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream3, "2")); - offsetKeys.add(writeToS3(topicName, outputStream4, "2")); - offsetKeys.add(writeToS3(topicName, outputStream5, "2")); + offsetKeys.add(writeToS3(topic, outputStream3, "00002")); + offsetKeys.add(writeToS3(topic, outputStream4, "00002")); + offsetKeys.add(writeToS3(topic, outputStream5, "00002")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -214,23 +230,23 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { when(context.offsetStorageReader()).thenReturn(offsetStorageReader); when(offsetStorageReader.offsets(any())).thenReturn(new HashMap<>()); - final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig); + final OffsetManager offsetManager = new OffsetManager(context); final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient, new HashDistributionStrategy(maxTasks), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); + "{{topic}}-{{partition}}-{{start_offset}}", taskId); final HashSet seenKeys = new HashSet<>(); final Map> seenRecords = new HashMap<>(); while (sourceRecordIterator.hasNext()) { final S3SourceRecord s3SourceRecord = sourceRecordIterator.next(); - final String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey(); + final String key = s3SourceRecord.getObjectKey(); seenRecords.compute(key, (k, v) -> { final List lst = v == null ? new ArrayList<>() : v; // NOPMD new object inside loop - lst.add(s3SourceRecord.getRecordNumber()); + lst.add(s3SourceRecord.getOffsetManagerEntry().getRecordCount()); return lst; }); assertThat(offsetKeys).contains(key); @@ -251,8 +267,8 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { @Test void verifyIteratorRehydration(final TestInfo testInfo) { // create 2 files. - final var topicName = IntegrationBase.topicName(testInfo); - final Map configData = getConfig(topicName, 1); + final var topic = IntegrationBase.getTopic(testInfo); + final Map configData = getConfig(topic, 1); configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); @@ -265,10 +281,8 @@ void verifyIteratorRehydration(final TestInfo testInfo) { final List actualKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0") - .substring((OBJECT_KEY + SEPARATOR).length())); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0") - .substring((OBJECT_KEY + SEPARATOR).length())); + expectedKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00000")); + expectedKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00000")); assertThat(testBucketAccessor.listObjects()).hasSize(2); @@ -286,8 +300,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) { assertThat(actualKeys).containsAll(expectedKeys); // write 3rd object to s3 - expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "0") - .substring((OBJECT_KEY + SEPARATOR).length())); + expectedKeys.add(writeToS3(topic, testData3.getBytes(StandardCharsets.UTF_8), "00000")); assertThat(testBucketAccessor.listObjects()).hasSize(3); assertThat(iter).hasNext(); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index fa4f60b76..f204a7f90 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -16,8 +16,8 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.OBJECT_KEY; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.RECORD_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -51,6 +51,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; +import io.aiven.kafka.connect.common.source.OffsetManager; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -102,6 +104,8 @@ static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final S3Client getS3Client(); + String getS3Prefix(); + /** * Write file to s3 with the specified key and data. * @@ -120,21 +124,23 @@ default void writeToS3WithKey(final String objectKey, final byte[] testDataBytes } /** - * Writes to S3 using a key of the form {@code [prefix]topicName-partitionId-systemTime.txt}. + * Writes to S3 using a key of the form {@code [prefix]topic-partitionId-systemTime.txt}. * - * @param topicName + * @param topic * the topic name to use * @param testDataBytes * the data. * @param partitionId * the partition id. - * @return the key prefixed by {@link S3SourceTask#OBJECT_KEY} and - * {@link io.aiven.kafka.connect.s3.source.utils.OffsetManager#SEPARATOR} + * @return the key prefixed by {@link io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry#OBJECT_KEY} and + * {@link OffsetManager} */ - default String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { - final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; + default String writeToS3(final String topic, final byte[] testDataBytes, final String partitionId) { + final String objectKey = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topic + "-" + + partitionId + "-" + System.currentTimeMillis() + ".txt"; writeToS3WithKey(objectKey, testDataBytes); - return OBJECT_KEY + SEPARATOR + objectKey; + return objectKey; + } default AdminClient newAdminClient(final String bootstrapServers) { @@ -157,13 +163,13 @@ static Path getPluginDir() throws IOException { return Files.createDirectories(testDir.resolve(PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA)); } - static String topicName(final TestInfo testInfo) { + static String getTopic(final TestInfo testInfo) { return testInfo.getTestMethod().get().getName(); } - static void createTopics(final AdminClient adminClient, final List topicNames) + static void createTopics(final AdminClient adminClient, final List topics) throws ExecutionException, InterruptedException { - final var newTopics = topicNames.stream().map(s -> new NewTopic(s, 4, (short) 1)).collect(Collectors.toList()); + final var newTopics = topics.stream().map(s -> new NewTopic(s, 4, (short) 1)).collect(Collectors.toList()); adminClient.createTopics(newTopics).all().get(); } @@ -257,9 +263,13 @@ static Map consumeOffsetMessages(KafkaConsumer c final Map messages = new HashMap<>(); final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { - Map offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD + final Map offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD + }); + final List key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD }); - messages.putAll(offsetRec); + // key.get(0) will return the name of the connector the commit is from. + final Map keyDetails = (Map) key.get(1); + messages.put((String) keyDetails.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT)); } return messages; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index ad31acc88..d78694de2 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -30,8 +30,6 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -41,6 +39,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -87,6 +87,7 @@ final class IntegrationTest implements IntegrationBase { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class); private static final String CONNECTOR_NAME = "aiven-s3-source-connector"; + private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-test-"; private static final int OFFSET_FLUSH_INTERVAL_MS = 500; private static String s3Endpoint; @@ -108,9 +109,15 @@ public S3Client getS3Client() { return s3Client; } - public + @Override + public String getS3Prefix() { + return s3Prefix; + } + + @BeforeAll + public static void setUpAll() throws IOException, InterruptedException { + s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; - @BeforeAll static void setUpAll() throws IOException, InterruptedException { s3Client = IntegrationBase.createS3Client(LOCALSTACK); s3Endpoint = LOCALSTACK.getEndpoint().toString(); testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME); @@ -131,8 +138,8 @@ void setUp(final TestInfo testInfo) throws Exception { connectRunner.startConnectCluster(CONNECTOR_NAME, localListenerPort, containerListenerPort); adminClient = newAdminClient(connectRunner.getBootstrapServers()); - final String topicName = IntegrationBase.topicName(testInfo); - final var topics = List.of(topicName); + final String topic = IntegrationBase.getTopic(testInfo); + final var topics = List.of(topic); IntegrationBase.createTopics(adminClient, topics); // This should be done after the process listening the port is already started by host but @@ -155,21 +162,24 @@ void tearDown() { @ParameterizedTest @ValueSource(booleans = { true, false }) void bytesTest(final boolean addPrefix) { - final var topicName = IntegrationBase.topicName(testInfo); + final var topic = IntegrationBase.getTopic(testInfo); final ObjectDistributionStrategy objectDistributionStrategy; final int partitionId = 0; final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; - String s3Prefix = ""; + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; if (addPrefix) { - objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; - s3Prefix = "topics/" + topicName + "/partition=" + partitionId + "/"; - } else { - objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + s3Prefix = "topics/" + topic + "/partition=" + partitionId + "/"; } final String fileNamePatternSeparator = "_"; - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, objectDistributionStrategy, + /* + final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 1); + + connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); + */ + + final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 1, objectDistributionStrategy, addPrefix, s3Prefix, prefixPattern, fileNamePatternSeparator); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); @@ -181,20 +191,16 @@ void bytesTest(final boolean addPrefix) { final List offsetKeys = new ArrayList<>(); // write 2 objects to s3 - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, - fileNamePatternSeparator)); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, - fileNamePatternSeparator)); - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, - fileNamePatternSeparator)); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, - fileNamePatternSeparator)); - offsetKeys.add(writeToS3(topicName, new byte[0], "3", s3Prefix, "-")); + offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00000")); + offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00000")); + offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00001")); + offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00001")); + offsetKeys.add(writeToS3(topic, new byte[0], "00003")); assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll messages from the Kafka topic and verify the consumed data - final List records = IntegrationBase.consumeByteMessages(topicName, 4, + final List records = IntegrationBase.consumeByteMessages(topic, 4, connectRunner.getBootstrapServers()); // Verify that the correct data is read from the S3 bucket and pushed to Kafka @@ -209,9 +215,11 @@ void bytesTest(final boolean addPrefix) { @Test void avroTest(final TestInfo testInfo) throws IOException { - final var topicName = IntegrationBase.topicName(testInfo); + final var topic = IntegrationBase.getTopic(testInfo); final boolean addPrefix = false; - final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", "", + s3Prefix = ""; + //final Map connectorConfig = getAvroConfig(topic, InputFormat.AVRO); + final Map connectorConfig = getAvroConfig(topic, InputFormat.AVRO, addPrefix, "", "", ObjectDistributionStrategy.OBJECT_HASH); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -237,20 +245,18 @@ void avroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - final String s3Prefix = ""; + offsetKeys.add(writeToS3(topic, outputStream1, "00001")); + offsetKeys.add(writeToS3(topic, outputStream2, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream1, "1", s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream2, "1", s3Prefix, "-")); - - offsetKeys.add(writeToS3(topicName, outputStream3, "2", s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream4, "2", s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream5, "2", s3Prefix, "-")); + offsetKeys.add(writeToS3(topic, outputStream3, "00002")); + offsetKeys.add(writeToS3(topic, outputStream4, "00002")); + offsetKeys.add(writeToS3(topic, outputStream5, "00002")); assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll Avro messages from the Kafka topic and deserialize them // Waiting for 25k kafka records in this test so a longer Duration is added. - final List records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5, + final List records = IntegrationBase.consumeAvroMessages(topic, numOfRecsFactor * 5, Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure this method deserializes Avro @@ -271,22 +277,19 @@ void avroTest(final TestInfo testInfo) throws IOException { @ParameterizedTest @ValueSource(booleans = { true, false }) void parquetTest(final boolean addPrefix) throws IOException { - final var topicName = IntegrationBase.topicName(testInfo); - - final String partition = "0"; + final var topic = IntegrationBase.getTopic(testInfo); final ObjectDistributionStrategy objectDistributionStrategy; final String prefixPattern = "bucket/topics/{{topic}}/partition/{{partition}}/"; - String s3Prefix = ""; + final String partition = "00000"; + s3Prefix = addPrefix ? "bucket/topics/" + topic + "/partition/" + partition + "/" : ""; objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; - if (addPrefix) { - s3Prefix = "bucket/topics/" + topicName + "/partition/" + partition + "/"; - } - final String fileName = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + "-" + partition + "-" - + System.currentTimeMillis() + ".txt"; + final String fileName = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topic + "-" + + partition + "-" + System.currentTimeMillis() + ".txt"; final String name = "testuser"; - final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET, addPrefix, s3Prefix, + //final Map connectorConfig = getAvroConfig(topic, InputFormat.PARQUET); + final Map connectorConfig = getAvroConfig(topic, InputFormat.PARQUET, addPrefix, s3Prefix, prefixPattern, objectDistributionStrategy); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final Path path = ContentUtils.getTmpFilePath(name); @@ -300,7 +303,7 @@ void parquetTest(final boolean addPrefix) throws IOException { } // Waiting for a small number of messages so using a smaller Duration of a minute - final List records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60), + final List records = IntegrationBase.consumeAvroMessages(topic, 100, Duration.ofSeconds(60), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); final List expectedRecordNames = IntStream.range(0, 100) .mapToObj(i -> name + i) @@ -309,10 +312,11 @@ void parquetTest(final boolean addPrefix) throws IOException { .containsExactlyInAnyOrderElementsOf(expectedRecordNames); } - private Map getAvroConfig(final String topicName, final InputFormat inputFormat, + private Map getAvroConfig(final String topic, final InputFormat inputFormat, final boolean addPrefix, final String s3Prefix, final String prefixPattern, final ObjectDistributionStrategy objectDistributionStrategy) { - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4, objectDistributionStrategy, + // final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 4); + final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 4, objectDistributionStrategy, addPrefix, s3Prefix, prefixPattern, "-"); connectorConfig.put(INPUT_FORMAT_KEY, inputFormat.getValue()); connectorConfig.put(SCHEMA_REGISTRY_URL, schemaRegistry.getSchemaRegistryUrl()); @@ -324,8 +328,8 @@ private Map getAvroConfig(final String topicName, final InputFor @Test void jsonTest(final TestInfo testInfo) { - final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, + final var topic = IntegrationBase.getTopic(testInfo); + final Map connectorConfig = getConfig(CONNECTOR_NAME, topic, 1, ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "", "", "-"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue()); connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter"); @@ -339,10 +343,10 @@ void jsonTest(final TestInfo testInfo) { } final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8); - final String offsetKey = writeToS3(topicName, jsonBytes, "1", "", "-"); + final String offsetKey = writeToS3(topic, jsonBytes, "00001"); // Poll Json messages from the Kafka topic and deserialize them - final List records = IntegrationBase.consumeJsonMessages(topicName, 500, + final List records = IntegrationBase.consumeJsonMessages(topic, 500, connectRunner.getBootstrapServers()); assertThat(records).map(jsonNode -> jsonNode.get("payload")).anySatisfy(jsonNode -> { @@ -383,7 +387,6 @@ private static Map basicS3ConnectorConfig(final boolean addPrefi config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); } config.put(TARGET_TOPIC_PARTITIONS, "0,1"); - return config; } @@ -400,12 +403,4 @@ static void verifyOffsetPositions(final Map expectedRecords, fin }); } } - - String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId, - final String s3Prefix, final String separator) { - final String objectKey = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + separator - + partitionId + separator + System.currentTimeMillis() + ".txt"; - writeToS3WithKey(objectKey, testDataBytes); - return OBJECT_KEY + SEPARATOR + objectKey; - } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 3ed3fdafd..54d90ef29 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -16,15 +16,18 @@ package io.aiven.kafka.connect.s3.source; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.AbstractSourceTask; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; import io.aiven.kafka.connect.common.source.task.DistributionStrategy; @@ -33,8 +36,8 @@ import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; -import io.aiven.kafka.connect.s3.source.utils.OffsetManager; import io.aiven.kafka.connect.s3.source.utils.RecordProcessor; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; @@ -52,12 +55,6 @@ public class S3SourceTask extends AbstractSourceTask { /** The logger to write to */ private static final Logger LOGGER = LoggerFactory.getLogger(S3SourceTask.class); - public static final String BUCKET = "bucket"; - public static final String TOPIC = "topic"; - - public static final String OBJECT_KEY = "object_key"; - public static final String PARTITION = "topicPartition"; - /** An iterator or S3SourceRecords */ private Iterator s3SourceRecordIterator; /** @@ -68,11 +65,11 @@ public class S3SourceTask extends AbstractSourceTask { private AWSV2SourceClient awsv2SourceClient; /** The offset manager this task uses */ - private OffsetManager offsetManager; + private OffsetManager offsetManager; private S3SourceConfig s3SourceConfig; private int taskId; - private Pattern filePattern; + private String filePattern; public S3SourceTask() { super(LOGGER); @@ -120,10 +117,8 @@ public boolean hasNext() { @Override public SourceRecord next() { final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next(); - offsetManager.updateAndReturnCurrentOffsets(s3SourceRecord.getPartitionMap(), - s3SourceRecord.getObjectKey(), s3SourceRecord.getRecordNumber()); - return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient, - offsetManager); + final S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry(); + return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient, entry); } }; return IteratorUtils.filteredIterator(inner, Objects::nonNull); @@ -134,7 +129,7 @@ protected SourceCommonConfig configure(final Map props) { LOGGER.info("S3 Source task started."); this.s3SourceConfig = new S3SourceConfig(props); this.transformer = s3SourceConfig.getTransformer(); - offsetManager = new OffsetManager(context, s3SourceConfig); + offsetManager = new OffsetManager<>(context); awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig); setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId)); @@ -151,6 +146,7 @@ public void commitRecord(final SourceRecord record) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Committed individual record {} committed", (Map) record.sourceOffset()); } + offsetManager.removeEntry(record); } /** @@ -186,12 +182,10 @@ private DistributionStrategy initializeObjectDistributionStrategy() { DistributionStrategy distributionStrategy; if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + this.filePattern = s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString(); distributionStrategy = new PartitionDistributionStrategy(maxTasks); } else { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + this.filePattern = s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString(); distributionStrategy = new HashDistributionStrategy(maxTasks); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index d9dbc0d45..ba836c051 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; /** @@ -76,7 +77,7 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig) { * the beginning key, or {@code null} to start at the beginning. * @return a Stream of S3Objects for the current state of the S3 storage. */ - private Stream getS3ObjectStream(final String startToken) { + Stream getS3ObjectStream(final String startToken) { final ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) @@ -84,6 +85,7 @@ private Stream getS3ObjectStream(final String startToken) { .startAfter(StringUtils.defaultIfBlank(startToken, null)) .build(); + return Stream.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. if (response.isTruncated()) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/ConnectUtils.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/ConnectUtils.java deleted file mode 100644 index 6c60bb8ed..000000000 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/ConnectUtils.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static io.aiven.kafka.connect.s3.source.S3SourceTask.BUCKET; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.PARTITION; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.TOPIC; - -import java.util.HashMap; -import java.util.Map; - -final public class ConnectUtils { - - private ConnectUtils() { - // hidden - } - public static Map getPartitionMap(final String topicName, final Integer defaultPartitionId, - final String bucketName) { - final Map partitionMap = new HashMap<>(); - partitionMap.put(BUCKET, bucketName); - partitionMap.put(TOPIC, topicName); - partitionMap.put(PARTITION, defaultPartitionId); - return partitionMap; - } -} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java deleted file mode 100644 index 95bc4053d..000000000 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static java.util.stream.Collectors.toMap; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.kafka.connect.source.SourceTaskContext; - -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OffsetManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); - public static final String SEPARATOR = "_"; - private final Map, Map> offsets; - - public OffsetManager(final SourceTaskContext context, final S3SourceConfig s3SourceConfig) { - final String s3Bucket = s3SourceConfig.getAwsS3BucketName(); - final Set partitions = parsePartitions(s3SourceConfig); - final Set topics = parseTopics(s3SourceConfig); - - // Build the partition keys and fetch offsets from offset storage - final List> partitionKeys = buildPartitionKeys(s3Bucket, partitions, topics); - final Map, Map> offsetMap = context.offsetStorageReader() - .offsets(partitionKeys); - - LOGGER.info(" ********** offsetMap ***** {}", offsetMap); - this.offsets = offsetMap.entrySet() - .stream() - .filter(e -> e.getValue() != null) - .collect(toMap(entry -> new HashMap<>(entry.getKey()), entry -> new HashMap<>(entry.getValue()))); - LOGGER.info(" ********** offsets ***** {}", offsets); - } - - public Map, Map> getOffsets() { - return Collections.unmodifiableMap(offsets); - } - - public long incrementAndUpdateOffsetMap(final Map partitionMap, final String currentObjectKey, - final long startOffset) { - if (offsets.containsKey(partitionMap)) { - final Map offsetValue = new HashMap<>(offsets.get(partitionMap)); - if (offsetValue.containsKey(getObjectMapKey(currentObjectKey))) { - final long newOffsetVal = (long) offsetValue.get(getObjectMapKey(currentObjectKey)) + 1L; - offsetValue.put(getObjectMapKey(currentObjectKey), newOffsetVal); - offsets.put(partitionMap, offsetValue); - return newOffsetVal; - } else { - offsetValue.put(getObjectMapKey(currentObjectKey), startOffset); - offsets.put(partitionMap, offsetValue); - return startOffset; - } - } - return startOffset; - } - - public Map updateAndReturnCurrentOffsets(final Map partitionMap, - final String currentObjectKey, final long offset) { - final Map offsetMap = offsets.compute(partitionMap, (k, v) -> { - final Map map = v == null ? new Hashtable<>() : v; - map.put(getObjectMapKey(currentObjectKey), offset); - return map; - }); - return new HashMap<>(offsetMap); - } - - public static String getObjectMapKey(final String currentObjectKey) { - return OBJECT_KEY + SEPARATOR + currentObjectKey; - } - - public long recordsProcessedForObjectKey(final Map partitionMap, final String currentObjectKey) { - if (offsets.containsKey(partitionMap)) { - return (long) offsets.get(partitionMap).getOrDefault(getObjectMapKey(currentObjectKey), 0L); - } - return 0L; - } - - private static Set parsePartitions(final S3SourceConfig s3SourceConfig) { - final String partitionString = s3SourceConfig.getTargetTopicPartitions(); - return Arrays.stream(partitionString.split(",")).map(Integer::parseInt).collect(Collectors.toSet()); - } - - private static Set parseTopics(final S3SourceConfig s3SourceConfig) { - final String topicString = s3SourceConfig.getTargetTopics(); - return Arrays.stream(topicString.split(",")).collect(Collectors.toSet()); - } - - private static List> buildPartitionKeys(final String bucket, final Set partitions, - final Set topics) { - final List> partitionKeys = new ArrayList<>(); - partitions.forEach(partition -> topics.forEach(topic -> { - partitionKeys.add(ConnectUtils.getPartitionMap(topic, partition, bucket)); - })); - return partitionKeys; - } -} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java index cab511693..e6b6ac543 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java @@ -35,9 +35,9 @@ private RecordProcessor() { public static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecord, final S3SourceConfig s3SourceConfig, final AWSV2SourceClient sourceClient, - final OffsetManager offsetManager) { + final S3OffsetManagerEntry s3OffsetManagerEntry) { try { - return s3SourceRecord.getSourceRecord(offsetManager); + return s3SourceRecord.getSourceRecord(s3OffsetManagerEntry); } catch (DataException e) { if (ErrorsTolerance.NONE.equals(s3SourceConfig.getErrorsTolerance())) { throw new ConnectException("Data Exception caught during S3 record to source record transformation", e); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java new file mode 100644 index 000000000..a9868acfc --- /dev/null +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java @@ -0,0 +1,212 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.aiven.kafka.connect.common.source.OffsetManager; + +import com.google.common.base.Objects; + +public final class S3OffsetManagerEntry implements OffsetManager.OffsetManagerEntry { + + // package private statics for testing. + // TODO make this package private after values in S3SourceTask are no longer needed + public static final String BUCKET = "bucket"; + public static final String OBJECT_KEY = "objectKey"; + public static final String TOPIC = "topic"; + public static final String PARTITION = "partition"; + public static final String RECORD_COUNT = "recordCount"; + + /** + * THe list of Keys that may not be set via {@link #setProperty(String, Object)}. + */ + static final List RESTRICTED_KEYS = List.of(RECORD_COUNT); + /** The data map that stores all the values */ + private final Map data; + /** THe record count for the data map. Extracted here because it is used/updated frequently during processing */ + private long recordCount; + + private final String bucket; + private final String objectKey; + private final String topic; + private final Integer partition; + + /** + * Construct the S3OffsetManagerEntry. + * + * @param bucket + * the bucket we are using. + * @param s3ObjectKey + * the S3Object key. + * @param topic + * The topic we are using. + * @param partition + * the partition we are using. + */ + public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final String topic, + final Integer partition) { + this.bucket = bucket; + this.objectKey = s3ObjectKey; + this.topic = topic; + this.partition = partition; + data = new HashMap<>(); + } + + /** + * Constructs an OffsetManagerEntry from an existing map. Used to reconstitute previously serialized + * S3OffsetManagerEntries. used by {@link #fromProperties(Map)} + * + * @param properties + * the property map. + */ + private S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final String topic, + final Integer partition, final Map properties) { + this(bucket, s3ObjectKey, topic, partition); + data.putAll(properties); + final Object recordCountProperty = data.computeIfAbsent(RECORD_COUNT, s -> 0L); + if (recordCountProperty instanceof Number) { + recordCount = ((Number) recordCountProperty).longValue(); + } + } + + /** + * Creates an S3OffsetManagerEntry. Will return {@code null} if properties is {@code null}. + * + * @param properties + * the properties to wrap. May be {@code null}. + * @return an S3OffsetManagerEntry. + * @throws IllegalArgumentException + * if one of the {@link #RESTRICTED_KEYS} is missing. + */ + @Override + public S3OffsetManagerEntry fromProperties(final Map properties) { + if (properties == null) { + return null; + } + return new S3OffsetManagerEntry(bucket, objectKey, topic, partition, properties); + } + + @Override + public Object getProperty(final String key) { + if (RECORD_COUNT.equals(key)) { + return recordCount; + } + return data.get(key); + } + + @Override + public void setProperty(final String property, final Object value) { + if (RESTRICTED_KEYS.contains(property)) { + throw new IllegalArgumentException( + String.format("'%s' is a restricted key and may not be set using setProperty()", property)); + } + data.put(property, value); + } + + @Override + public void incrementRecordCount() { + recordCount++; + } + + /** + * Gets the umber of records extracted from data returned from S3. + * + * @return the umber of records extracted from data returned from S3. + */ + public long getRecordCount() { + return recordCount; + } + + /** + * Gets the S3Object key for the current object. + * + * @return the S3ObjectKey. + */ + public String getKey() { + return objectKey; + } + + @Override + public int getPartition() { + return partition; + } + + @Override + public String getTopic() { + return topic; + } + + /** + * Gets the S3 bucket for the current object. + * + * @return the S3 Bucket for the current object. + */ + public String getBucket() { + return bucket; + } + /** + * Creates a new offset map. No defensive copy is necessary. + * + * @return a new map of properties and values. + */ + @Override + public Map getProperties() { + final Map result = new HashMap<>(data); + result.put(RECORD_COUNT, recordCount); + return result; + } + /** + * Returns the OffsetManagerKey for this Entry. + * + * @return the OffsetManagerKey for this Entry. + */ + @Override + public OffsetManager.OffsetManagerKey getManagerKey() { + return () -> Map.of(BUCKET, bucket, OBJECT_KEY, objectKey); + } + + @Override + public boolean equals(final Object other) { + if (other instanceof S3OffsetManagerEntry) { + return compareTo((S3OffsetManagerEntry) other) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(getBucket(), getTopic(), getPartition()); + } + + @Override + public int compareTo(final S3OffsetManagerEntry other) { + if (this == other) { // NOPMD comparing instance + return 0; + } + int result = ((String) getProperty(BUCKET)).compareTo((String) other.getProperty(BUCKET)); + if (result == 0) { + result = getKey().compareTo(other.getKey()); + if (result == 0) { + result = Long.compare(getRecordCount(), other.getRecordCount()); + } + } + return result; + } +} diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 05ca02ba4..e273c1faa 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -16,54 +16,71 @@ package io.aiven.kafka.connect.s3.source.utils; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; +import software.amazon.awssdk.services.s3.model.S3Object; public class S3SourceRecord { - private final Map partitionMap; - private final long recordNumber; - private final String topic; - private final Integer topicPartition; - private final SchemaAndValue keyData; - - private final SchemaAndValue valueData; - - private final String objectKey; - - public S3SourceRecord(final Map partitionMap, final long recordNumber, final String topic, - final Integer topicPartition, final String objectKey, final SchemaAndValue keyData, - final SchemaAndValue valueData) { - this.partitionMap = new HashMap<>(partitionMap); - this.recordNumber = recordNumber; - this.topic = topic; - this.topicPartition = topicPartition; + + + private SchemaAndValue keyData; + private SchemaAndValue valueData; + /** The S3OffsetManagerEntry for this source record */ + private S3OffsetManagerEntry offsetManagerEntry; + + + private final S3Object s3Object; + + + public S3SourceRecord(S3Object s3Object) { + this.s3Object = s3Object; + } + + public S3SourceRecord clone() { + S3SourceRecord result = new S3SourceRecord(s3Object); + result.setOffsetManagerEntry(offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties())); + result.keyData = keyData; + result.valueData = valueData; + return result; + } + + public void setOffsetManagerEntry(S3OffsetManagerEntry offsetManagerEntry) { + this.offsetManagerEntry = offsetManagerEntry; + } + + public long getRecordCount() { + return offsetManagerEntry == null ? 0 : offsetManagerEntry.getRecordCount(); + } + + public void setKeyData(SchemaAndValue keyData) { this.keyData = keyData; - this.valueData = valueData; - this.objectKey = objectKey; } - public Map getPartitionMap() { - return Collections.unmodifiableMap(partitionMap); + public void incrementRecordCount() { + this.offsetManagerEntry.incrementRecordCount(); } - public long getRecordNumber() { - return recordNumber; + public void setValueData(SchemaAndValue valueData) { + this.valueData = valueData; } public String getTopic() { - return topic; + return offsetManagerEntry.getTopic(); } - public Integer partition() { - return topicPartition; + public int getPartition() { + return offsetManagerEntry.getPartition(); } +// public S3SourceRecord(final S3OffsetManagerEntry offsetManagerEntry, final SchemaAndValue keyData, +// final SchemaAndValue valueData) { +// this.offsetManagerEntry = offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties()); +// this.keyData = keyData; +// this.valueData = valueData; +// } + public String getObjectKey() { - return objectKey; + return s3Object.key(); } public SchemaAndValue getKey() { @@ -74,10 +91,13 @@ public SchemaAndValue getValue() { return new SchemaAndValue(valueData.schema(), valueData.value()); } - public SourceRecord getSourceRecord(final OffsetManager offsetManager) { - final Map offsetMap = offsetManager.updateAndReturnCurrentOffsets(getPartitionMap(), - getObjectKey(), getRecordNumber()); - return new SourceRecord(getPartitionMap(), offsetMap, topic, partition(), keyData.schema(), keyData.value(), - valueData.schema(), valueData.value()); + public S3OffsetManagerEntry getOffsetManagerEntry() { + return offsetManagerEntry.fromProperties(offsetManagerEntry.getProperties()); // return a defensive copy + } + + public SourceRecord getSourceRecord(final S3OffsetManagerEntry offsetManager) { + return new SourceRecord(offsetManagerEntry.getManagerKey().getPartitionMap(), + offsetManagerEntry.getProperties(), offsetManagerEntry.getTopic(), offsetManagerEntry.getPartition(), + keyData.schema(), keyData.value(), valueData.schema(), valueData.value()); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 820be20aa..6437a12b2 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -18,14 +18,17 @@ import java.util.Collections; import java.util.Iterator; -import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Function; +import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Stream; +import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -41,65 +44,55 @@ public final class SourceRecordIterator implements Iterator { public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; - private final OffsetManager offsetManager; + /** The OffsetManager that we are using */ + private final OffsetManager offsetManager; + /** The configuration for this S3 source */ private final S3SourceConfig s3SourceConfig; - private final String bucketName; - + /** The transformer for the data conversions */ private final Transformer transformer; - // Once we decouple the S3Object from the Source Iterator we can change this to be the SourceApiClient - // At which point it will work for al our integrations. + /** The AWS client that provides the S3Objects */ private final AWSV2SourceClient sourceClient; - - private String topic; - private int partitionId; - + /** The S3 bucket we are processing */ + private final String bucket; + /** The distrivbution strategy we will use */ private final DistributionStrategy distributionStrategy; + /** The task ID associated with this iterator */ private final int taskId; - private final Iterator inner; - + /** The inner iterator to provides S3Object that have been filtered potentially had data extracted */ + private final Iterator inner; + /** The outer iterator that provides S3SourceRecords */ private Iterator outer; - private final Pattern filePattern; + + final FileMatching fileMatching; + + final Predicate> taskAssignment; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, final Transformer transformer, final AWSV2SourceClient sourceClient, - final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { + final DistributionStrategy distributionStrategy, final String filePattern, final int taskId) { super(); this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; - - this.bucketName = s3SourceConfig.getAwsS3BucketName(); + this.bucket = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; this.sourceClient = sourceClient; - this.filePattern = filePattern; this.distributionStrategy = distributionStrategy; this.taskId = taskId; - // Initialize predicates - sourceClient.addPredicate(this::isFileMatchingPattern); - sourceClient.addPredicate(this::isFileAssignedToTask); - - // call filters out bad file names and extracts topic/partition - inner = sourceClient.getS3ObjectIterator(null); - outer = Collections.emptyIterator(); - } + fileMatching = new FileMatching(filePattern); + taskAssignment = new TaskAssignment(distributionStrategy, fileMatching.pattern); - public boolean isFileMatchingPattern(final S3Object s3Object) { - final Optional optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key()); - final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key()); + Stream s3SourceRecordStream = sourceClient.getS3ObjectStream(null) + .map(fileMatching) + .filter(taskAssignment) + .map(Optional::get); - if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) { - topic = optionalTopic.get(); - partitionId = optionalPartitionId.get(); - return true; - } - return false; + inner = s3SourceRecordStream.iterator(); + outer = Collections.emptyIterator(); } - public boolean isFileAssignedToTask(final S3Object s3Object) { - return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern); - } @Override public boolean hasNext() { @@ -122,60 +115,88 @@ public void remove() { /** * Converts the S3Object into stream of S3SourceRecords. * - * @param s3Object - * the S3Object to read data from. + * @param s3SourceRecord + * the SourceRecord that drives the creation of source records with values. * @return a stream of S3SourceRecords created from the input stream of the S3Object. */ - private Stream convert(final S3Object s3Object) { - - final Map partitionMap = ConnectUtils.getPartitionMap(topic, partitionId, bucketName); - final long recordCount = offsetManager.recordsProcessedForObjectKey(partitionMap, s3Object.key()); - - // Optimizing without reading stream again. - if (transformer instanceof ByteArrayTransformer && recordCount > 0) { - return Stream.empty(); - } + private Stream convert(final S3SourceRecord s3SourceRecord) { - final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig); + s3SourceRecord.setKeyData(transformer.getKeyData(s3SourceRecord.getObjectKey(), s3SourceRecord.getTopic(), + s3SourceConfig)); return transformer - .getRecords(sourceClient.getObject(s3Object.key()), topic, partitionId, s3SourceConfig, recordCount) - .map(new Mapper(partitionMap, recordCount, keyData, s3Object.key())); + .getRecords(sourceClient.getObject(s3SourceRecord.getObjectKey()), s3SourceRecord.getTopic(), + s3SourceRecord.getPartition(), s3SourceConfig, s3SourceRecord.getRecordCount()) + .map(new Mapper(s3SourceRecord)); } /** * maps the data from the @{link Transformer} stream to an S3SourceRecord given all the additional data required. */ - class Mapper implements Function { - /** - * The partition map - */ - private final Map partitionMap; - /** - * The record number for the record being created. - */ - private long recordCount; + static class Mapper implements Function { /** - * The schema and value for the key + * The S3SourceRecord that produceces the values. */ - private final SchemaAndValue keyData; - /** - * The object key from S3 - */ - private final String objectKey; - - public Mapper(final Map partitionMap, final long recordCount, final SchemaAndValue keyData, - final String objectKey) { - this.partitionMap = partitionMap; - this.recordCount = recordCount; - this.keyData = keyData; - this.objectKey = objectKey; + private final S3SourceRecord sourceRecord; + + public Mapper(final S3SourceRecord sourceRecord) { + // TODO this is the point where the global S3OffsetManagerEntry becomes local and we can do a lookahead type + // operation within the Transformer + // to see if there are more records. + this.sourceRecord = sourceRecord.clone(); } @Override public S3SourceRecord apply(final SchemaAndValue valueData) { - recordCount++; - return new S3SourceRecord(partitionMap, recordCount, topic, partitionId, objectKey, keyData, valueData); + sourceRecord.incrementRecordCount(); + S3SourceRecord result = sourceRecord.clone(); + result.setValueData(valueData); + return result; } } + + class TaskAssignment implements Predicate> { + final DistributionStrategy distributionStrategy; + final Pattern pattern; + + TaskAssignment(DistributionStrategy distributionStrategy, Pattern pattern) { + this.distributionStrategy = distributionStrategy; + this.pattern = pattern; + } + + + @Override + public boolean test(Optional s3SourceRecord) { + if (s3SourceRecord.isPresent()) { + S3SourceRecord record = s3SourceRecord.get(); + if (distributionStrategy.isPartOfTask(taskId, record.getObjectKey(), pattern)) { + return true; + } + } + return false; + } + } + + class FileMatching implements Function> { + + Pattern pattern; + FileMatching(String filePattern) { + pattern = FilePatternUtils.configurePattern(filePattern); + } + + @Override + public Optional apply(S3Object s3Object) { + Optional topic = FilePatternUtils.getTopic(pattern, s3Object.key()); + OptionalInt partition = FilePatternUtils.getPartitionId(pattern, s3Object.key()); + if (topic.isPresent() && partition.isPresent()) { + S3SourceRecord s3SourceRecord = new S3SourceRecord(s3Object); + S3OffsetManagerEntry offsetManagerEntry = new S3OffsetManagerEntry(bucket, s3Object.key(), topic.get(), partition.getAsInt()); + offsetManagerEntry = offsetManager.getEntry(offsetManagerEntry.getManagerKey(), offsetManagerEntry::fromProperties).orElse(offsetManagerEntry); + s3SourceRecord.setOffsetManagerEntry(offsetManagerEntry); + return Optional.of(s3SourceRecord); + } + return Optional.empty(); + } + } + } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index c915376c9..a58455633 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -19,6 +19,9 @@ import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.BUCKET; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.OBJECT_KEY; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.RECORD_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -48,8 +51,7 @@ import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import io.aiven.kafka.connect.s3.source.utils.ConnectUtils; -import io.aiven.kafka.connect.s3.source.utils.OffsetManager; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.findify.s3mock.S3Mock; @@ -63,6 +65,7 @@ import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.S3Object; final class S3SourceTaskTest { @@ -80,7 +83,7 @@ final class S3SourceTaskTest { private static final int PARTITION = 1; - private static final String OBJECT_KEY = "object_key"; + private static final String TEST_OBJECT_KEY = "object_key"; // TODO S3Mock has not been maintained in 4 years // Adobe have an alternative we can move to. @@ -151,11 +154,14 @@ void testStop() { assertThat(s3SourceTask.isRunning()).isFalse(); } - private static S3SourceRecord createS3SourceRecord(final String topicName, final Integer defaultPartitionId, - final String bucketName, final String objectKey, final byte[] key, final byte[] value) { - return new S3SourceRecord(ConnectUtils.getPartitionMap(topicName, defaultPartitionId, bucketName), 0L, - topicName, defaultPartitionId, objectKey, new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, key), - new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value)); + private static S3SourceRecord createS3SourceRecord(final String topic, final Integer defaultPartitionId, + final String bucket, final String objectKey, final byte[] key, final byte[] value) { + + S3SourceRecord result = new S3SourceRecord(S3Object.builder().key(objectKey).size((long)value.length).build()); + result.setOffsetManagerEntry(new S3OffsetManagerEntry(bucket, objectKey, topic, defaultPartitionId)); + result.setKeyData(new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, key)); + result.setValueData(new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value)); + return result; } private void startSourceTask(final S3SourceTask s3SourceTask) { @@ -199,11 +205,14 @@ void testPollWithNoDataReturned() { private void assertEquals(final S3SourceRecord s3Record, final SourceRecord sourceRecord) { assertThat(sourceRecord).isNotNull(); - assertThat(sourceRecord.sourcePartition()).isEqualTo(s3Record.getPartitionMap()); + + assertThat(sourceRecord.sourcePartition()).hasSize(2); + assertThat(sourceRecord.sourcePartition().get(BUCKET)).isEqualTo(s3Record.getOffsetManagerEntry().getBucket()); + assertThat(sourceRecord.sourcePartition().get(OBJECT_KEY)).isEqualTo(s3Record.getOffsetManagerEntry().getKey()); + final Map map = (Map) sourceRecord.sourceOffset(); - assertThat(map.get(OffsetManager.getObjectMapKey(s3Record.getObjectKey()))) - .isEqualTo(s3Record.getRecordNumber()); + assertThat(map.get(RECORD_COUNT)).isEqualTo(s3Record.getOffsetManagerEntry().getRecordCount()); assertThat(sourceRecord.key()).isEqualTo(s3Record.getKey().value()); assertThat(sourceRecord.value()).isEqualTo(s3Record.getValue().value()); } @@ -229,10 +238,11 @@ void testPollsWithRecords() { private List createS3SourceRecords(final int count) { final List lst = new ArrayList<>(); if (count > 0) { - lst.add(createS3SourceRecord(TOPIC, PARTITION, TEST_BUCKET, OBJECT_KEY, + + lst.add(createS3SourceRecord(TOPIC, PARTITION, TEST_BUCKET, TEST_OBJECT_KEY, "Hello".getBytes(StandardCharsets.UTF_8), "Hello World".getBytes(StandardCharsets.UTF_8))); for (int i = 1; i < count; i++) { - lst.add(createS3SourceRecord(TOPIC, PARTITION, TEST_BUCKET, OBJECT_KEY + i, + lst.add(createS3SourceRecord(TOPIC, PARTITION, TEST_BUCKET, TEST_OBJECT_KEY + i, "Goodbye".getBytes(StandardCharsets.UTF_8), String.format("Goodbye cruel World (%s)", i).getBytes(StandardCharsets.UTF_8))); } @@ -290,7 +300,7 @@ void testPollWithSlowProducer() { final List lst = createS3SourceRecords(3); final Iterator sourceRecordIterator = new Iterator<>() { - final Iterator inner = lst.iterator(); + Iterator inner = lst.iterator(); @Override public boolean hasNext() { return inner.hasNext(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java deleted file mode 100644 index 1367d71f0..000000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/OffsetManagerTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.utils; - -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; -import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; -import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.kafka.connect.source.SourceTaskContext; -import org.apache.kafka.connect.storage.OffsetStorageReader; - -import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; - -final class OffsetManagerTest { - - private Map properties; - private static final String TEST_BUCKET = "test-bucket"; - - @Mock - private SourceTaskContext sourceTaskContext; - - private S3SourceConfig s3SourceConfig; - - private OffsetManager offsetManager; - - @BeforeEach - public void setUp() { - properties = new HashMap<>(); - setBasicProperties(); - s3SourceConfig = new S3SourceConfig(properties); - } - - @Test - void testWithOffsets() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - partitionKey.put("bucket", TEST_BUCKET); - - final Map offsetValue = new HashMap<>(); - offsetValue.put("object_key_file", 5L); - final Map, Map> offsets = new HashMap<>(); - offsets.put(partitionKey, offsetValue); - - when(offsetStorageReader.offsets(any())).thenReturn(offsets); - - offsetManager = new OffsetManager(sourceTaskContext, s3SourceConfig); - - final Map, Map> retrievedOffsets = offsetManager.getOffsets(); - assertThat(retrievedOffsets.size()).isEqualTo(1); - assertThat(retrievedOffsets.values().iterator().next().get("object_key_file")).isEqualTo(5L); - } - - @Test - void testIncrementAndUpdateOffsetMapExistingOffset() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - // Mock partition and offset values - final String objectKey = "testObject"; - final String offsetObjectKey = OBJECT_KEY + "_" + objectKey; - - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - partitionKey.put("bucket", "bucket"); - - final Map offsetValue = new HashMap<>(); - offsetValue.put(offsetObjectKey, 1L); // Existing offset value - final Map, Map> offsets = new HashMap<>(); - offsets.put(partitionKey, offsetValue); - - when(offsetStorageReader.offsets(any())).thenReturn(offsets); // Mock offset retrieval - - // Initialize offset manager - offsetManager = new OffsetManager(sourceTaskContext, s3SourceConfig); - - // Invoke method and assert new offset value - final long newOffset = offsetManager.incrementAndUpdateOffsetMap(partitionKey, objectKey, 2L); - - assertThat(newOffset).isEqualTo(2L); // Expect incremented offset - assertThat(offsetManager.getOffsets().get(partitionKey).get(offsetObjectKey)).isEqualTo(2L); // Verify updated - // offset in map - } - - @Test - void testIncrementAndUpdateOffsetMapNonExistingOffset() { - sourceTaskContext = mock(SourceTaskContext.class); - final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); - when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); - - // Mock partition without any existing offset - final Map partitionKey = new HashMap<>(); - partitionKey.put("topic", "topic1"); - partitionKey.put("partition", 0); - - when(offsetStorageReader.offsets(any())).thenReturn(Collections.emptyMap()); // No existing offset - - // Initialize offset manager - offsetManager = new OffsetManager(sourceTaskContext, s3SourceConfig); - - // Invoke method and assert new offset value - final long startOffset = 5L; - final long newOffset = offsetManager.incrementAndUpdateOffsetMap(partitionKey, "", startOffset); - - // Expect the startOffset to be returned when no existing offset is found - assertThat(newOffset).isEqualTo(startOffset); - } - - private void setBasicProperties() { - properties.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET); - properties.put(TARGET_TOPIC_PARTITIONS, "0,1"); - properties.put(TARGET_TOPICS, "topic1,topic2"); - } -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java index cc9db65cd..8d9800577 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java @@ -53,7 +53,7 @@ class RecordProcessorTest { @Mock private Converter keyConverter; @Mock - private OffsetManager offsetManager; + private S3OffsetManagerEntry s3OffsetManagerEntry; @Mock private AWSV2SourceClient sourceClient; @@ -66,10 +66,10 @@ void testCreateSourceRecord() { final SourceRecord mockSourceRecord = mock(SourceRecord.class); final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getSourceRecord(any(OffsetManager.class))).thenReturn(mockSourceRecord); + when(mockRecord.getSourceRecord(any(S3OffsetManagerEntry.class))).thenReturn(mockSourceRecord); final SourceRecord result = RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, - offsetManager); + s3OffsetManagerEntry); verify(mockRecord, times(1)).getSourceRecord(any()); assertThat(result).isEqualTo(mockSourceRecord); @@ -80,27 +80,28 @@ void testCreateSourceRecord() { void testCreateSourceRecordWithDataError() { final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getSourceRecord(any(OffsetManager.class))).thenThrow(new DataException("Testing exception")); + when(mockRecord.getSourceRecord(any(S3OffsetManagerEntry.class))) + .thenThrow(new DataException("Testing exception")); when(s3SourceConfig.getErrorsTolerance()).thenReturn(ErrorsTolerance.NONE); assertThatExceptionOfType(ConnectException.class).as("Errors tolerance: NONE") .isThrownBy(() -> RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, - offsetManager)); + s3OffsetManagerEntry)); when(s3SourceConfig.getErrorsTolerance()).thenReturn(ErrorsTolerance.ALL); final SourceRecord result = RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, - offsetManager); + s3OffsetManagerEntry); assertThat(result).isNull(); } @Test void testCreateSourceRecords() { final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getSourceRecord(any(OffsetManager.class))).thenReturn(mock(SourceRecord.class)); + when(mockRecord.getSourceRecord(any(S3OffsetManagerEntry.class))).thenReturn(mock(SourceRecord.class)); final SourceRecord sourceRecords = RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, - offsetManager); + s3OffsetManagerEntry); assertThat(sourceRecords).isNotNull(); } @@ -108,13 +109,12 @@ void testCreateSourceRecords() { @Test void errorToleranceOnNONE() { final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getSourceRecord(any(OffsetManager.class))).thenThrow(new DataException("generic issue")); + when(mockRecord.getSourceRecord(any(S3OffsetManagerEntry.class))).thenThrow(new DataException("generic issue")); when(s3SourceConfig.getErrorsTolerance()).thenReturn(ErrorsTolerance.NONE); - assertThatThrownBy( - () -> RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, offsetManager)) - .isInstanceOf(ConnectException.class) + assertThatThrownBy(() -> RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, + s3OffsetManagerEntry)).isInstanceOf(ConnectException.class) .hasMessage("Data Exception caught during S3 record to source record transformation"); } @@ -122,11 +122,11 @@ void errorToleranceOnNONE() { @Test void errorToleranceOnALL() { final S3SourceRecord mockRecord = mock(S3SourceRecord.class); - when(mockRecord.getSourceRecord(any(OffsetManager.class))).thenThrow(new DataException("generic issue")); + when(mockRecord.getSourceRecord(any(S3OffsetManagerEntry.class))).thenThrow(new DataException("generic issue")); when(s3SourceConfig.getErrorsTolerance()).thenReturn(ErrorsTolerance.ALL); - assertThat(RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, offsetManager)) + assertThat(RecordProcessor.createSourceRecord(mockRecord, s3SourceConfig, sourceClient, s3OffsetManagerEntry)) .isNull(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java new file mode 100644 index 000000000..acfdc5cc6 --- /dev/null +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3.source.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import io.aiven.kafka.connect.common.source.OffsetManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +final class S3OffsetManagerEntryTest { + + static final String TEST_BUCKET = "test-bucket"; + + static final String TOPIC = "TOPIC1"; + + static final int PARTITION = 1; + + static final String OBJECT_KEY = "object_key"; + + private SourceTaskContext sourceTaskContext; + + private OffsetManager offsetManager; + + private OffsetStorageReader offsetStorageReader; + + @BeforeEach + public void setUp() { + offsetStorageReader = mock(OffsetStorageReader.class); + sourceTaskContext = mock(SourceTaskContext.class); + when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); + offsetManager = new OffsetManager<>(sourceTaskContext); + } + + private Map createPartitionMap() { + final Map partitionKey = new HashMap<>(); + partitionKey.put(S3OffsetManagerEntry.TOPIC, TOPIC); + partitionKey.put(S3OffsetManagerEntry.PARTITION, PARTITION); + partitionKey.put(S3OffsetManagerEntry.BUCKET, TEST_BUCKET); + partitionKey.put(S3OffsetManagerEntry.OBJECT_KEY, OBJECT_KEY); + return partitionKey; + } + + public static S3OffsetManagerEntry newEntry() { + return new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + } + + @Test + void testGetEntry() { + final Map storedData = new HashMap<>(); + storedData.putAll(createPartitionMap()); + storedData.put("random_entry", 5L); + + when(offsetStorageReader.offset(any())).thenReturn(storedData); + + final S3OffsetManagerEntry keyEntry = newEntry(); + final Optional entry = offsetManager.getEntry(keyEntry.getManagerKey(), + keyEntry::fromProperties); + assertThat(entry).isPresent(); + assertThat(entry.get().getPartition()).isEqualTo(PARTITION); + assertThat(entry.get().getRecordCount()).isEqualTo(0); + assertThat(entry.get().getTopic()).isEqualTo(TOPIC); + assertThat(entry.get().getBucket()).isEqualTo(TEST_BUCKET); + assertThat(entry.get().getProperty("random_entry")).isEqualTo(5L); + verify(sourceTaskContext, times(1)).offsetStorageReader(); + + // verify second read reads from local data + + final Optional entry2 = offsetManager.getEntry(entry.get().getManagerKey(), + entry.get()::fromProperties); + assertThat(entry2).isPresent(); + assertThat(entry2.get().getPartition()).isEqualTo(PARTITION); + assertThat(entry2.get().getRecordCount()).isEqualTo(0); + assertThat(entry2.get().getTopic()).isEqualTo(TOPIC); + assertThat(entry2.get().getBucket()).isEqualTo(TEST_BUCKET); + assertThat(entry2.get().getProperty("random_entry")).isEqualTo(5L); + verify(sourceTaskContext, times(1)).offsetStorageReader(); + } + + @Test + void testFromProperties() { + final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + assertThat(entry.getRecordCount()).isEqualTo(0L); + assertThat(entry.getProperty("random_entry")).isNull(); + + entry.setProperty("random_entry", 5L); + entry.incrementRecordCount(); + assertThat(entry.getRecordCount()).isEqualTo(1L); + assertThat(entry.getProperty("random_entry")).isEqualTo(5L); + + final S3OffsetManagerEntry other = entry.fromProperties(entry.getProperties()); + assertThat(other.getRecordCount()).isEqualTo(1L); + assertThat(other.getProperty("random_entry")).isEqualTo(5L); + + } +} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index f7559ddfd..a6572f0f5 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,11 +16,10 @@ package io.aiven.kafka.connect.s3.source.utils; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; -import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; +import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; +import static io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry.RECORD_COUNT; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; @@ -34,22 +33,31 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; -import io.aiven.kafka.connect.common.source.input.AvroTransformer; +import io.aiven.kafka.connect.common.source.OffsetManager; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; -import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -57,169 +65,195 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; final class SourceRecordIteratorTest { - private S3SourceConfig mockConfig; - private OffsetManager mockOffsetManager; - private Transformer mockTransformer; - private AWSV2SourceClient mockSourceApiClient; - @BeforeEach - public void setUp() { - mockConfig = mock(S3SourceConfig.class); - mockOffsetManager = mock(OffsetManager.class); - mockTransformer = mock(Transformer.class); - mockSourceApiClient = mock(AWSV2SourceClient.class); - } - - @Test - void testIteratorProcessesS3Objects() throws Exception { - - final String key = "topic-00001-abc123.txt"; - - // Mock InputStream - try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); - - mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + private OffsetStorageReader offsetStorageReader; - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); + private OffsetManager offsetManager; - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Collections.emptyIterator()); - Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(1), - FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); + private S3SourceConfig mockS3SourceConfig; - assertThat(iterator.hasNext()).isFalse(); - mockPatternMatcher(filePattern); +// public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, +// final Transformer transformer, final AWSV2SourceClient sourceClient, +// final DistributionStrategy distributionStrategy, final String filePattern, final int taskId) - final S3Object obj = S3Object.builder().key(key).build(); - - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, - new HashDistributionStrategy(1), filePattern, 0); - - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); - } + @BeforeEach + void setup() { + final SourceTaskContext mockTaskContext = mock(SourceTaskContext.class); + offsetStorageReader = mock(OffsetStorageReader.class); + when(mockTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader); + offsetManager = new OffsetManager(mockTaskContext); + mockSourceApiClient = mock(AWSV2SourceClient.class); + mockS3SourceConfig = mock(S3SourceConfig.class); + when(mockS3SourceConfig.getAwsS3BucketName()).thenReturn("bucket-name"); } +// private S3SourceConfig getConfig(Map data) { +// Map defaults = new HashMap<>(); +// defaults.put(AWS_S3_BUCKET_NAME_CONFIG, "bucket-name"); +// defaults.putAll(data); +// return new S3SourceConfig(defaults); +// } + @Test - void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { + void testIteratorProcessesS3Objects() throws Exception { + final String key = "topic-00001-abc123.txt"; - final S3Object s3Object = S3Object.builder().key(key).build(); - - // With ByteArrayTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - - mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Stream.of(SchemaAndValue.NULL)); - - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - // should skip if any records were produced by source record iterator. - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - verify(mockSourceApiClient, never()).getObject(any()); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } - - // With AvroTransformer - try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { - when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); - final Pattern filePattern = mock(Pattern.class); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); - mockTransformer = mock(AvroTransformer.class); - when(mockSourceApiClient.getListOfObjectKeys(any())) - .thenReturn(Collections.singletonList(key).listIterator()); - - when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) - .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); - mockPatternMatcher(filePattern); - - when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) - .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); - - final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator.hasNext()).isFalse(); - - verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); - } - } + final String filePattern = "{{topic}}-{{partition}}"; + final Transformer transformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(offsetStorageReader.offset(any(Map.class))).thenReturn(new HashMap<>()); + when(mockSourceApiClient.getS3ObjectStream(any())).thenReturn(Stream.empty()); - @ParameterizedTest - @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, - final String objectKey) { + Iterator iterator = new SourceRecordIterator(mockS3SourceConfig, offsetManager, transformer, + mockSourceApiClient, new HashDistributionStrategy(1), + filePattern, 0); - mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); + assertThat(iterator).isExhausted(); - mockPatternMatcher(filePattern); + S3Object s3Object = S3Object.builder().key(key).size(1L).build(); + when(mockSourceApiClient.getS3ObjectStream(any())).thenReturn(Collections.singletonList(s3Object).stream()); + when(mockSourceApiClient.getObject(key)).thenReturn(() -> new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))); - final S3Object obj = S3Object.builder().key(objectKey).build(); + iterator = new SourceRecordIterator(mockS3SourceConfig, offsetManager, transformer, mockSourceApiClient, + new HashDistributionStrategy(1), filePattern, 0); - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); - final Predicate s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); - // Assert - assertThat(s3ObjectPredicate).accepts(obj); - } + assertThat(iterator).hasNext(); + assertThat(iterator.next()).isNotNull(); + assertThat(iterator).isExhausted(); - @ParameterizedTest - @CsvSource({ "4, 1, topic1-2-0", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", - "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, - final String objectKey) { - mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); - final Pattern filePattern = mock(Pattern.class); - - mockPatternMatcher(filePattern); - - final S3Object obj = S3Object.builder().key(objectKey).build(); - - final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); - when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); - when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); - final Predicate stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) - && iterator.isFileAssignedToTask(s3Object); - // Assert - assertThat(stringPredicate.test(obj)).as("Predicate should accept the objectKey: " + objectKey).isFalse(); } - private static void mockPatternMatcher(final Pattern filePattern) { - final Matcher fileMatcher = mock(Matcher.class); - when(filePattern.matcher(any())).thenReturn(fileMatcher); - when(fileMatcher.find()).thenReturn(true); - when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic"); - when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0"); - } +// @Test +// void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { +// final String key = "topic-00001-abc123.txt"; +// final String filePattern = "{{topic}}-{{partition}}"; +// final S3ClientBuilder builder = new S3ClientBuilder(); +// final S3SourceConfig config = getConfig(Collections.emptyMap()); +// +// // With ByteArrayTransformer +// Transformer transformer = TransformerFactory.getTransformer(InputFormat.BYTES); +// +// builder.addObject(key, "Hello World"); +// +// sourceApiClient = new AWSV2SourceClient(builder.build(), config); +// +// when(offsetStorageReader.offset(any(Map.class))).thenReturn(Map.of(RECORD_COUNT, 1)); +// +// +// // should skip if any records were produced by source record iterator. +// Iterator iterator = new SourceRecordIterator(config, offsetManager, +// transformer, sourceApiClient, new HashDistributionStrategy(1), filePattern, 0); +// +// assertThat(iterator).isExhausted(); +// +// +// // With JsonTransformer +// StringBuilder jsonContent = new StringBuilder(); +// for (int i = 0; i < 5; i++) { +// jsonContent.append(String.format("{\"message\": \"Hello World\", \"id\":\"%s\"}%n", i)); +// } +// +// builder.reset().addObject(key, jsonContent.toString()); +// sourceApiClient = new AWSV2SourceClient(builder.build(), config); +// +// transformer = TransformerFactory.getTransformer(InputFormat.JSONL); +// +// iterator = new SourceRecordIterator(config, offsetManager, +// transformer, sourceApiClient, new HashDistributionStrategy(1), filePattern, 0); +// assertThat(iterator).hasNext(); +// final S3SourceRecord record = iterator.next(); +// assertThat((Map) record.getValue().value()).containsEntry("id", "1"); +// } +// +// @ParameterizedTest +// @ValueSource( ints={0, 1, 2, 3}) +// void testRetrieveTaskBasedData(int taskId) { +// Map expectedTask = Map.of(3, "key-1", 0, "key-2", 1, "key-3", 2, "key-4"); +// +// int maxTasks = 4; +// final String filePattern = "{{topic}}-{{partition}}"; +// final S3SourceConfig config = getConfig(Collections.emptyMap()); +// Transformer transformer = TransformerFactory.getTransformer(InputFormat.BYTES); +// +// final S3ClientBuilder builder = new S3ClientBuilder(); +// for (String key : expectedTask.values()) { +// builder.addObject(key, "Hello World - "+key); +// } +// sourceApiClient = new AWSV2SourceClient(builder.build(), config); +// +// final DistributionStrategy distributionStrategy = new HashDistributionStrategy(maxTasks) ; +// +// final SourceRecordIterator iterator = new SourceRecordIterator(config, offsetManager, transformer, +// sourceApiClient, distributionStrategy, filePattern, taskId); +// +// assertThat(iterator.hasNext()); +// S3SourceRecord record = iterator.next(); +// assertThat(record.getObjectKey()).isEqualTo(expectedTask.get(taskId)); +// assertThat(iterator).isExhausted(); +// } +// +// class S3ClientBuilder { +// Queue, Map>> blocks = new LinkedList<>(); +// List objects = new ArrayList<>(); +// Map data = new HashMap<>(); +// +// public S3ClientBuilder addObject(String key, byte[] data) { +// objects.add(S3Object.builder().key(key).size((long)data.length).build()); +// this.data.put(key, data); +// return this; +// } +// +// public S3ClientBuilder endOfBlock() { +// blocks.add(Pair.of(objects, data)); +// return reset(); +// } +// +// public S3ClientBuilder reset() { +// objects = new ArrayList<>(); +// data = new HashMap<>(); +// return this; +// } +// +// public S3ClientBuilder addObject(String key, String data) { +// return addObject(key, data.getBytes(StandardCharsets.UTF_8)); +// } +// +// private ResponseBytes getResponse(String key) { +// return ResponseBytes.fromByteArray(new byte[0],data.get(key)); +// } +// +// private ListObjectsV2Response dequeueData() { +// if (blocks.isEmpty()) { +// objects = Collections.emptyList(); +// data = Collections.emptyMap(); +// } else { +// Pair, Map> pair = blocks.remove(); +// objects = pair.getLeft(); +// data = pair.getRight(); +// } +// return ListObjectsV2Response.builder().contents(objects).isTruncated(false).build(); +// } +// +// public S3Client build() { +// if (!objects.isEmpty()) { +// endOfBlock(); +// } +// S3Client result = mock(S3Client.class); +// when(result.listObjectsV2(any(ListObjectsV2Request.class))).thenAnswer(env -> dequeueData()); +// when(result.listObjectsV2(any(Consumer.class))).thenAnswer(env -> dequeueData()); +// when(result.getObjectAsBytes(any(GetObjectRequest.class))).thenAnswer(env -> getResponse(env.getArgument(0, GetObjectRequest.class).key())); +// return result; +// } +// +// } }