diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index b6a7393ee..9a5db94e7 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -13,10 +13,14 @@ name: "CodeQL"
on:
push:
- branches: [main]
+ branches:
+ - main
+ - s3-source-release
pull_request:
# The branches below must be a subset of the branches above
- branches: [main]
+ branches:
+ - main
+ - s3-source-release
schedule:
- cron: "42 20 * * 6"
diff --git a/.github/workflows/main_push_workflow.yml b/.github/workflows/main_push_workflow.yml
index 7db41ce21..393534842 100644
--- a/.github/workflows/main_push_workflow.yml
+++ b/.github/workflows/main_push_workflow.yml
@@ -2,9 +2,13 @@
name: Main and pull request checks
on:
push:
- branches: [ main ]
+ branches:
+ - main
+ - s3-source-release
pull_request:
- branches: [ main ]
+ branches:
+ - main
+ - s3-source-release
jobs:
build:
strategy:
@@ -30,4 +34,4 @@ jobs:
run: ./gradlew build test
- name: Build in Linux
if: runner.os == 'Linux'
- run: ./gradlew build check test integrationTest
+ run: ./gradlew build check test integrationTest -i
diff --git a/README.md b/README.md
index b8bd950e8..b8f0ff2e2 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,7 @@
- [Aiven GCS Sink Connector](./gcs-sink-connector/README.md)
- [Aiven S3 Sink Connector](./s3-sink-connector/README.md)
- [Aiven Azure Blob Sink Connector](./azure-sink-connector/README.md)
+- [Aiven S3 Source Connector](./s3-source-connector/README.md)
# Development
diff --git a/commons/build.gradle.kts b/commons/build.gradle.kts
index 9bdc06b78..101ef8db9 100644
--- a/commons/build.gradle.kts
+++ b/commons/build.gradle.kts
@@ -27,7 +27,7 @@ dependencies {
implementation(confluent.kafka.connect.avro.data) {
exclude(group = "org.apache.kafka", module = "kafka-clients")
}
-
+ implementation("commons-io:commons-io:2.18.0")
implementation(tools.spotbugs.annotations)
implementation(compressionlibs.snappy)
implementation(compressionlibs.zstd.jni)
@@ -41,6 +41,7 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-api")
exclude(group = "org.apache.avro", module = "avro")
}
+
implementation(apache.hadoop.common) {
exclude(group = "org.apache.hadoop.thirdparty", module = "hadoop-shaded-protobuf_3_7")
exclude(group = "com.google.guava", module = "guava")
@@ -86,11 +87,12 @@ dependencies {
testImplementation(jackson.databind)
testImplementation(testinglibs.mockito.core)
testImplementation(testinglibs.assertj.core)
+ testImplementation(testinglibs.awaitility)
testImplementation(testFixtures(project(":commons")))
-
testImplementation(testinglibs.woodstox.stax2.api)
testImplementation(apache.hadoop.mapreduce.client.core)
testImplementation(confluent.kafka.connect.avro.converter)
+ testImplementation("org.mockito:mockito-junit-jupiter:5.14.2")
testRuntimeOnly(testinglibs.junit.jupiter.engine)
testRuntimeOnly(logginglibs.logback.classic)
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java
index 8c4683a34..0242d40b7 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java
@@ -27,6 +27,8 @@
public class CommonConfig extends AbstractConfig {
protected static final String GROUP_COMPRESSION = "File Compression";
protected static final String GROUP_FORMAT = "Format";
+ public static final String TASK_ID = "task.id";
+ public static final String MAX_TASKS = "tasks.max";
/**
* @deprecated No longer needed.
@@ -58,4 +60,25 @@ public Long getKafkaRetryBackoffMs() {
return new BackoffPolicyConfig(this).getKafkaRetryBackoffMs();
}
+ /**
+ *
+ * Get the maximum number of tasks that should be run by this connector configuration Max Tasks is set within the
+ * Kafka Connect framework and so is retrieved slightly differently in ConnectorConfig.java
+ *
+ * @return The maximum number of tasks that should be run by this connector configuration
+ */
+ public int getMaxTasks() {
+ // TODO when Connect framework is upgraded it will be possible to retrieve this information from the configDef
+ // as tasksMax
+ return Integer.parseInt(this.originalsStrings().get(MAX_TASKS));
+ }
+ /**
+ * Get the task id for this configuration
+ *
+ * @return The task id for this configuration
+ */
+ public int getTaskId() {
+ return Integer.parseInt(this.originalsStrings().get(TASK_ID));
+ }
+
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java
index 8d3156e22..467ea2cb2 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java
@@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment {
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
- static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
+ public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";
+ public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
+ static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";
+
public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
}
@@ -109,9 +112,18 @@ public void ensureValid(final String name, final Object value) {
configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD
- // UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);
+ configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
+ new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+ "The template for file prefix on S3. "
+ + "Supports `{{ variable }}` placeholders for substituting variables. "
+ + "Currently supported variables are `topic` and `partition` "
+ + "and are mandatory to have these in the directory structure."
+ + "Example prefix : topics/{{topic}}/partition/{{partition}}/",
+ GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
+ ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);
+
return configDef;
}
@@ -185,4 +197,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}
+ public String getFilePathPrefixTemplateConfig() {
+ return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
+ }
+
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
new file mode 100644
index 000000000..8ea7b7f95
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.config;
+
+import java.util.Locale;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import io.aiven.kafka.connect.common.source.input.InputFormat;
+
+public final class SchemaRegistryFragment extends ConfigFragment {
+ private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
+ public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
+ public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
+ public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
+ public static final String INPUT_FORMAT_KEY = "input.format";
+ public static final String SCHEMAS_ENABLE = "schemas.enable";
+
+ /**
+ * Construct the ConfigFragment..
+ *
+ * @param cfg
+ * the configuration that this fragment is associated with.
+ */
+ public SchemaRegistryFragment(final AbstractConfig cfg) {
+ super(cfg);
+ }
+
+ public static ConfigDef update(final ConfigDef configDef) {
+ int srCounter = 0;
+ configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
+ ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
+ configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
+ new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
+ SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
+ configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
+ new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+ "Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
+ ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
+
+ configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
+ "Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
+ // UnusedAssignment
+ ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
+ return configDef;
+ }
+
+ public InputFormat getInputFormat() {
+ return InputFormat.valueOf(cfg.getString(INPUT_FORMAT_KEY).toUpperCase(Locale.ROOT));
+ }
+
+ public String getSchemaRegistryUrl() {
+ return cfg.getString(SCHEMA_REGISTRY_URL);
+ }
+
+ public Class> getAvroValueSerializer() {
+ return cfg.getClass(AVRO_VALUE_SERIALIZER);
+ }
+
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
index e363d7c9a..68036bd68 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
@@ -20,8 +20,66 @@
import org.apache.kafka.common.config.ConfigDef;
+import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
+import io.aiven.kafka.connect.common.source.input.InputFormat;
+import io.aiven.kafka.connect.common.source.input.Transformer;
+import io.aiven.kafka.connect.common.source.input.TransformerFactory;
+import io.aiven.kafka.connect.common.source.task.DistributionType;
+
public class SourceCommonConfig extends CommonConfig {
+
+ private final SchemaRegistryFragment schemaRegistryFragment;
+ private final SourceConfigFragment sourceConfigFragment;
+ private final FileNameFragment fileNameFragment;
+ private final OutputFormatFragment outputFormatFragment;
+
public SourceCommonConfig(ConfigDef definition, Map, ?> originals) {// NOPMD
super(definition, originals);
+ // Construct Fragments
+ schemaRegistryFragment = new SchemaRegistryFragment(this);
+ sourceConfigFragment = new SourceConfigFragment(this);
+ fileNameFragment = new FileNameFragment(this);
+ outputFormatFragment = new OutputFormatFragment(this);
+
+ validate(); // NOPMD ConstructorCallsOverridableMethod
+ }
+
+ private void validate() {
+ schemaRegistryFragment.validate();
+ sourceConfigFragment.validate();
+ fileNameFragment.validate();
+ outputFormatFragment.validate();
+ }
+
+ public InputFormat getInputFormat() {
+ return schemaRegistryFragment.getInputFormat();
+ }
+
+ public String getSchemaRegistryUrl() {
+ return schemaRegistryFragment.getSchemaRegistryUrl();
+ }
+
+ public String getTargetTopics() {
+ return sourceConfigFragment.getTargetTopics();
+ }
+ public String getTargetTopicPartitions() {
+ return sourceConfigFragment.getTargetTopicPartitions();
+ }
+
+ public ErrorsTolerance getErrorsTolerance() {
+ return sourceConfigFragment.getErrorsTolerance();
+ }
+
+ public DistributionType getDistributionType() {
+ return sourceConfigFragment.getDistributionType();
}
+
+ public int getMaxPollRecords() {
+ return sourceConfigFragment.getMaxPollRecords();
+ }
+
+ public Transformer getTransformer() {
+ return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
+ }
+
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
new file mode 100644
index 000000000..7f5d6276f
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.config;
+
+import static io.aiven.kafka.connect.common.source.task.DistributionType.OBJECT_HASH;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
+import io.aiven.kafka.connect.common.source.task.DistributionType;
+
+import org.apache.commons.lang3.StringUtils;
+
+public final class SourceConfigFragment extends ConfigFragment {
+ private static final String GROUP_OTHER = "OTHER_CFG";
+ public static final String MAX_POLL_RECORDS = "max.poll.records";
+ public static final String EXPECTED_MAX_MESSAGE_BYTES = "expected.max.message.bytes";
+ private static final String GROUP_OFFSET_TOPIC = "OFFSET_TOPIC";
+ public static final String TARGET_TOPIC_PARTITIONS = "topic.partitions";
+ public static final String TARGET_TOPICS = "topics";
+ public static final String ERRORS_TOLERANCE = "errors.tolerance";
+
+ public static final String DISTRIBUTION_TYPE = "distribution.type";
+
+ /**
+ * Construct the ConfigFragment..
+ *
+ * @param cfg
+ * the configuration that this fragment is associated with.
+ */
+ public SourceConfigFragment(final AbstractConfig cfg) {
+ super(cfg);
+ }
+
+ public static ConfigDef update(final ConfigDef configDef) {
+ int sourcePollingConfigCounter = 0;
+
+ configDef.define(MAX_POLL_RECORDS, ConfigDef.Type.INT, 500, ConfigDef.Range.atLeast(1),
+ ConfigDef.Importance.MEDIUM, "Max poll records", GROUP_OTHER, sourcePollingConfigCounter++,
+ ConfigDef.Width.NONE, MAX_POLL_RECORDS);
+ // KIP-298 Error Handling in Connect
+ configDef.define(ERRORS_TOLERANCE, ConfigDef.Type.STRING, ErrorsTolerance.NONE.name(),
+ new ErrorsToleranceValidator(), ConfigDef.Importance.MEDIUM,
+ "Indicates to the connector what level of exceptions are allowed before the connector stops, supported values : none,all",
+ GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, ERRORS_TOLERANCE);
+
+ configDef.define(EXPECTED_MAX_MESSAGE_BYTES, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM,
+ "The largest record batch size allowed by Kafka config max.message.bytes", GROUP_OTHER,
+ sourcePollingConfigCounter++, // NOPMD
+ // UnusedAssignment
+ ConfigDef.Width.NONE, EXPECTED_MAX_MESSAGE_BYTES);
+
+ // Offset Storage config group includes target topics
+ int offsetStorageGroupCounter = 0;
+ configDef.define(TARGET_TOPIC_PARTITIONS, ConfigDef.Type.STRING, "0", new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.MEDIUM, "eg : 0,1", GROUP_OFFSET_TOPIC, offsetStorageGroupCounter++,
+ ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
+ configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
+ offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
+ configDef.define(DISTRIBUTION_TYPE, ConfigDef.Type.STRING, OBJECT_HASH.name(),
+ new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
+ "Based on tasks.max config and the type of strategy selected, objects are processed in distributed"
+ + " way by Kafka connect workers, supported values : "
+ + Arrays.stream(DistributionType.values())
+ .map(DistributionType::value)
+ .collect(Collectors.joining(", ")),
+ GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, DISTRIBUTION_TYPE); // NOPMD
+ // UnusedAssignment
+
+ return configDef;
+ }
+
+ public String getTargetTopics() {
+ return cfg.getString(TARGET_TOPICS);
+ }
+
+ public String getTargetTopicPartitions() {
+ return cfg.getString(TARGET_TOPIC_PARTITIONS);
+ }
+
+ public int getMaxPollRecords() {
+ return cfg.getInt(MAX_POLL_RECORDS);
+ }
+
+ public int getExpectedMaxMessageBytes() {
+ return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES);
+ }
+
+ public ErrorsTolerance getErrorsTolerance() {
+ return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
+ }
+
+ public DistributionType getDistributionType() {
+ return DistributionType.forName(cfg.getString(DISTRIBUTION_TYPE));
+ }
+
+ private static class ErrorsToleranceValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String errorsTolerance = (String) value;
+ if (StringUtils.isNotBlank(errorsTolerance)) {
+ // This will throw an Exception if not a valid value.
+ ErrorsTolerance.forName(errorsTolerance);
+ }
+ }
+ }
+
+ private static class ObjectDistributionStrategyValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String objectDistributionStrategy = (String) value;
+ if (StringUtils.isNotBlank(objectDistributionStrategy)) {
+ // This will throw an Exception if not a valid value.
+ DistributionType.forName(objectDistributionStrategy);
+ }
+ }
+ }
+
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/enums/ErrorsTolerance.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/enums/ErrorsTolerance.java
new file mode 100644
index 000000000..9c42c46d9
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/enums/ErrorsTolerance.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.config.enums;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+import org.apache.kafka.common.config.ConfigException;
+
+public enum ErrorsTolerance {
+
+ NONE("none"), ALL("all");
+
+ private final String name;
+
+ ErrorsTolerance(final String name) {
+ this.name = name;
+ }
+
+ public static ErrorsTolerance forName(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
+ for (final ErrorsTolerance errorsTolerance : ErrorsTolerance.values()) {
+ if (errorsTolerance.name.equalsIgnoreCase(name)) {
+ return errorsTolerance;
+ }
+ }
+ throw new ConfigException(String.format("Unknown errors.tolerance type: %s, allowed values %s ", name,
+ Arrays.toString(ErrorsTolerance.values())));
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
new file mode 100644
index 000000000..f55257f46
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright 2024-2025 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import io.aiven.kafka.connect.common.config.SourceCommonConfig;
+import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles extracting records from an iterator and returning them to Kafka. It uses an exponential backoff
+ * with jitter to reduce the number of calls to the backend when there is no data. This solution:
+ *
+ *
When polled this implementation moves available records from the SourceRecord iterator to the return array.
+ *
if there are no records
+ *
+ *
{@link #poll()} will return null.
+ *
The poll will delay no more than approx 5 seconds.
+ *
+ *
+ *
Upto {@link #maxPollRecords} will be sent in a single poll request
+ *
When the connector is stopped any collected records are returned to kafka before stopping.
+ *
+ *
+ *
+ */
+public abstract class AbstractSourceTask extends SourceTask {
+
+ public static final List NULL_RESULT = null;
+
+ /**
+ * The maximum time to spend polling. This is set to 5 seconds as that is the time that is allotted to a system for
+ * shutdown.
+ */
+ public static final Duration MAX_POLL_TIME = Duration.ofSeconds(5);
+ /**
+ * The boolean that indicates the connector is stopped.
+ */
+ private final AtomicBoolean connectorStopped;
+
+ /**
+ * The logger to use. Set from the class implementing AbstractSourceTask.
+ */
+ private final Logger logger;
+
+ /**
+ * The maximum number of records to put in a poll. Specified in the configuration.
+ */
+ private int maxPollRecords;
+
+ /**
+ * The Backoff implementation that executes the delay in the poll loop.
+ */
+ private final Backoff backoff;
+
+ private final Timer timer;
+
+ /**
+ * The configuration
+ */
+ private SourceCommonConfig config;
+
+ private Iterator sourceRecordIterator;
+
+ /**
+ * Constructor.
+ *
+ * @param logger
+ * the logger to use.
+ */
+ protected AbstractSourceTask(final Logger logger) {
+ super();
+ this.logger = logger;
+ connectorStopped = new AtomicBoolean();
+ timer = new Timer(MAX_POLL_TIME);
+ backoff = new Backoff(timer.getBackoffConfig());
+ }
+
+ /**
+ * Gets the iterator of SourceRecords. The iterator that SourceRecords are extracted from during a poll event. When
+ * this iterator runs out of records it should attempt to reset and read more records from the backend on the next
+ * {@code hasNext()} call. In this way it should detect when new data has been added to the backend and continue
+ * processing.
+ *
+ * This method should handle any backend exception that can be retried. Any runtime exceptions that are thrown when
+ * this iterator executes may cause the task to abort.
+ *
+ *
+ * @param config
+ * the configuration for the Backoff.
+ * @return The iterator of SourceRecords.
+ */
+ abstract protected Iterator getIterator(BackoffConfig config);
+
+ /**
+ * Called by {@link #start} to allows the concrete implementation to configure itself based on properties.
+ *
+ * @param props
+ * the properties to use for configuration.
+ */
+ abstract protected SourceCommonConfig configure(Map props);
+
+ @Override
+ public final void start(final Map props) {
+ logger.debug("Starting");
+ config = configure(props);
+ maxPollRecords = config.getMaxPollRecords();
+ sourceRecordIterator = getIterator(timer.getBackoffConfig());
+ }
+
+ /**
+ * Try to add a SourceRecord to the results.
+ *
+ * @param results
+ * the result to add the record to.
+ * @param sourceRecordIterator
+ * the source record iterator.
+ * @return true if successful, false if the iterator is empty.
+ */
+ private boolean tryAdd(final List results, final Iterator sourceRecordIterator) {
+ if (sourceRecordIterator.hasNext()) {
+ backoff.reset();
+ final SourceRecord sourceRecord = sourceRecordIterator.next();
+ if (logger.isDebugEnabled()) {
+ logger.debug("tryAdd() : read record {}", sourceRecord.sourceOffset());
+ }
+ results.add(sourceRecord);
+ return true;
+ }
+ logger.info("No records found in tryAdd call");
+ return false;
+ }
+
+ /**
+ * Returns {@code true} if the connector is not stopped and the timer has not expired.
+ *
+ * @return {@code true} if the connector is not stopped and the timer has not expired.
+ */
+ protected boolean stillPolling() {
+ final boolean result = !connectorStopped.get() && !timer.isExpired();
+ logger.debug("Still polling: {}", result);
+ return result;
+ }
+
+ @Override
+ public final List poll() {
+ logger.debug("Polling");
+ if (connectorStopped.get()) {
+ logger.info("Stopping");
+ closeResources();
+ return NULL_RESULT;
+ } else {
+ timer.start();
+ try {
+ final List result = populateList();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Poll() returning {} SourceRecords.", result == null ? null : result.size());
+ }
+ return result;
+ } finally {
+ timer.stop();
+ timer.reset();
+ }
+ }
+ }
+
+ /**
+ * Attempts to populate the return list. Will read as many records into the list as it can until the timer expires
+ * or the task is shut down.
+ *
+ * @return A list SourceRecords or {@code null} if the system hit a runtime exception.
+ */
+ private List populateList() {
+ final List results = new ArrayList<>();
+ try {
+ while (stillPolling() && results.size() < maxPollRecords) {
+ if (!tryAdd(results, sourceRecordIterator)) {
+ if (!results.isEmpty()) {
+ logger.debug("tryAdd() did not add to the list, returning current results.");
+ // if we could not get a record and the results are not empty return them
+ break;
+ }
+ logger.debug("Attempting {}", backoff);
+ backoff.cleanDelay();
+ }
+ }
+
+ } catch (RuntimeException e) { // NOPMD must catch runtime here.
+ logger.error("Error during poll(): {}", e.getMessage(), e);
+ if (config.getErrorsTolerance() == ErrorsTolerance.NONE) {
+ logger.error("Stopping Task");
+ throw e;
+ }
+ }
+ return results.isEmpty() ? NULL_RESULT : results;
+ }
+
+ @Override
+ public final void stop() {
+ logger.debug("Stopping");
+ connectorStopped.set(true);
+ }
+
+ /**
+ * Returns the running state of the task.
+ *
+ * @return {@code true} if the connector is running, {@code false} otherwise.
+ */
+ public final boolean isRunning() {
+ return !connectorStopped.get();
+ }
+
+ /**
+ * Close any resources the source has open. Called by the IteratorRunnable when it is stopping.
+ */
+ abstract protected void closeResources();
+
+ /**
+ * Calculates elapsed time and flags when expired.
+ */
+ protected static class Timer extends StopWatch {
+ /**
+ * The length of time that the timer should run.
+ */
+ private final long duration;
+
+ /**
+ * The flag that indicates the timer has been aborted.
+ */
+ private boolean hasAborted;
+
+ /**
+ * Constructor.
+ *
+ * @param duration
+ * the length of time the timer should run.
+ */
+ Timer(final Duration duration) {
+ super();
+ this.duration = duration.toMillis();
+ }
+
+ /**
+ * Gets the maximum duration for this timer.
+ *
+ * @return the maximum duration for the timer.
+ */
+ public long millisecondsRemaining() {
+ return super.isStarted() ? duration - super.getTime() : duration;
+ }
+
+ /**
+ * Returns {@code true} if the timer has expired.
+ *
+ * @return {@code true} if the timer has expired.
+ */
+ public boolean isExpired() {
+ return hasAborted || super.getTime() >= duration;
+ }
+
+ /**
+ * Aborts the timer. Timer will report that it has expired until reset is called.
+ */
+ public void abort() {
+ hasAborted = true;
+ }
+
+ @Override
+ public void start() {
+ try {
+ hasAborted = false;
+ super.start();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ super.stop();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void reset() {
+ try {
+ hasAborted = false;
+ super.reset();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Gets a Backoff Config for this timer.
+ *
+ * @return a backoff Configuration.
+ */
+ public BackoffConfig getBackoffConfig() {
+ return new BackoffConfig() {
+
+ @Override
+ public SupplierOfLong getSupplierOfTimeRemaining() {
+ return Timer.this::millisecondsRemaining;
+ }
+
+ @Override
+ public AbortTrigger getAbortTrigger() {
+ return Timer.this::abort;
+ }
+ };
+ }
+ }
+
+ /**
+ * Performs a delay based on the number of successive {@link #delay()} or {@link #cleanDelay()} calls without a
+ * {@link #reset()}. Delay increases exponentially but never exceeds the time remaining by more than 0.512 seconds.
+ */
+ public static class Backoff {
+ /** The logger to write to */
+ private static final Logger LOGGER = LoggerFactory.getLogger(Backoff.class);
+ /**
+ * The maximum jitter random number. Should be a power of 2 for speed.
+ */
+ public static final int MAX_JITTER = 1024;
+
+ public static final int JITTER_SUBTRAHEND = MAX_JITTER / 2;
+ /**
+ * A supplier of the time remaining (in milliseconds) on the overriding timer.
+ */
+ private final SupplierOfLong timeRemaining;
+
+ /**
+ * A function to call to abort the timer.
+ */
+ private final AbortTrigger abortTrigger;
+
+ /**
+ * The maximum number of times {@link #delay()} will be called before maxWait is reached.
+ */
+ private int maxCount;
+ /**
+ * The number of times {@link #delay()} has been called.
+ */
+ private int waitCount;
+
+ /**
+ * A random number generator to construct jitter.
+ */
+ Random random = new Random();
+
+ /**
+ * Constructor.
+ *
+ * @param config
+ * The configuration for the backoff.
+ */
+ public Backoff(final BackoffConfig config) {
+ this.timeRemaining = config.getSupplierOfTimeRemaining();
+ this.abortTrigger = config.getAbortTrigger();
+ reset();
+ }
+
+ /**
+ * Reset the backoff time so that delay is again at the minimum.
+ */
+ public final void reset() {
+ // if the reminaing time is 0 or negative the maxCount will be infinity
+ // so make sure that it is 0 in that case.
+ final long remainingTime = timeRemaining.get();
+ maxCount = remainingTime < 1L ? 0 : (int) (Math.log10(remainingTime) / Math.log10(2));
+ waitCount = 0;
+ LOGGER.debug("Reset {}", this);
+ }
+
+ /**
+ * Handle adjustment when maxCount could not be set.
+ *
+ * @return the corrected maxCount
+ */
+ private int getMaxCount() {
+ if (maxCount == 0) {
+ reset();
+ }
+ return maxCount;
+ }
+
+ /**
+ * Calculates the delay wihtout jitter.
+ *
+ * @return the number of milliseconds the delay will be.
+ */
+ public long estimatedDelay() {
+ long sleepTime = timeRemaining.get();
+ if (sleepTime > 0 && waitCount < maxCount) {
+ sleepTime = (long) Math.min(sleepTime, Math.pow(2, waitCount + 1));
+ }
+ return sleepTime < 0 ? 0 : sleepTime;
+ }
+
+ /**
+ * Calculates the range of jitter in milliseconds.
+ *
+ * @return the maximum jitter in milliseconds. jitter is +/- maximum jitter.
+ */
+ public int getMaxJitter() {
+ return MAX_JITTER - JITTER_SUBTRAHEND;
+ }
+
+ private long timeWithJitter() {
+ // generate approx +/- 0.512 seconds of jitter
+ final int jitter = random.nextInt(MAX_JITTER) - JITTER_SUBTRAHEND;
+ return (long) Math.pow(2, waitCount) + jitter;
+ }
+
+ /**
+ * Delay execution based on the number of times this method has been called.
+ *
+ * @throws InterruptedException
+ * If any thread interrupts this thread.
+ */
+ public void delay() throws InterruptedException {
+ final long sleepTime = timeRemaining.get();
+ if (sleepTime > 0 && waitCount < (maxCount == 0 ? getMaxCount() : maxCount)) {
+ waitCount++;
+ final long nextSleep = timeWithJitter();
+ // don't sleep negative time. Jitter can introduce negative tme.
+ if (nextSleep > 0) {
+ if (nextSleep >= sleepTime) {
+ LOGGER.debug("Backoff aborting timer");
+ abortTrigger.apply();
+ } else {
+ LOGGER.debug("Backoff sleepiing {}", nextSleep);
+ Thread.sleep(nextSleep);
+ }
+ }
+ }
+ }
+
+ /**
+ * Like {@link #delay} but swallows the {@link InterruptedException}.
+ */
+ public void cleanDelay() {
+ try {
+ delay();
+ } catch (InterruptedException exception) {
+ // do nothing return results below
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Backoff %s/%s, %s milliseconds remaining.", waitCount, maxCount, timeRemaining.get());
+ }
+ }
+
+ /**
+ * A functional interface to return long values.
+ */
+ @FunctionalInterface
+ public interface SupplierOfLong {
+ long get();
+ }
+
+ /**
+ * A functional interface that will abort the timer. After being called timer will indicate that it is expired,
+ * until it is reset.
+ */
+ @FunctionalInterface
+ public interface AbortTrigger {
+ void apply();
+ }
+
+ /**
+ * An interface to define the Backoff configuration. Used for convenience with Timer.
+ */
+ public interface BackoffConfig {
+ SupplierOfLong getSupplierOfTimeRemaining();
+ AbortTrigger getAbortTrigger();
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java
new file mode 100644
index 000000000..760d074d2
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import io.confluent.connect.avro.AvroData;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.commons.io.function.IOSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroTransformer extends Transformer {
+
+ private final AvroData avroData;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroTransformer.class);
+
+ AvroTransformer(final AvroData avroData) {
+ super();
+ this.avroData = avroData;
+ }
+
+ @Override
+ public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) {
+ config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
+ }
+
+ @Override
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
+ private DataFileStream dataFileStream;
+ private final DatumReader datumReader = new GenericDatumReader<>();
+
+ @Override
+ protected InputStream inputOpened(final InputStream input) throws IOException {
+ dataFileStream = new DataFileStream<>(input, datumReader);
+ return input;
+ }
+
+ @Override
+ public void doClose() {
+ if (dataFileStream != null) {
+ try {
+ dataFileStream.close();
+ } catch (IOException e) {
+ LOGGER.error("Error closing reader: {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
+ if (dataFileStream.hasNext()) {
+ final GenericRecord record = dataFileStream.next();
+ action.accept(avroData.toConnectData(record.getSchema(), record));
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
+ final AbstractConfig sourceConfig) {
+ return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA,
+ ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
new file mode 100644
index 000000000..232aaef24
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.function.IOSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ByteArrayTransformer extends Transformer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);
+
+ private static final int MAX_BUFFER_SIZE = 4096;
+
+ @Override
+ public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) {
+ // For byte array transformations, ByteArrayConverter is the converter which is the default config.
+ }
+
+ @Override
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
+ @Override
+ protected InputStream inputOpened(final InputStream input) {
+ return input;
+ }
+
+ @Override
+ protected void doClose() {
+ // nothing to do.
+ }
+
+ @Override
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
+ final byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ try {
+ final int bytesRead = IOUtils.read(inputStream, buffer);
+ if (bytesRead == 0) {
+ return false;
+ }
+ if (bytesRead < MAX_BUFFER_SIZE) {
+ action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead)));
+ } else {
+ action.accept(new SchemaAndValue(null, buffer));
+ }
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Error trying to advance inputStream: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
+ final AbstractConfig sourceConfig) {
+ return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/InputFormat.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/InputFormat.java
new file mode 100644
index 000000000..8234e2c03
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/InputFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import java.util.Locale;
+
+public enum InputFormat {
+ AVRO("avro"), PARQUET("parquet"), JSONL("jsonl"), BYTES("bytes");
+
+ private final String format;
+
+ InputFormat(final String format) {
+ this.format = format;
+ }
+
+ public String getValue() {
+ return format.toLowerCase(Locale.ROOT);
+ }
+
+ @Override
+ public String toString() {
+ return format;
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
new file mode 100644
index 000000000..8069d08c1
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.function.IOSupplier;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JsonTransformer extends Transformer {
+
+ private final JsonConverter jsonConverter;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JsonTransformer.class);
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonTransformer(final JsonConverter jsonConverter) {
+ super();
+ this.jsonConverter = jsonConverter;
+ }
+
+ @Override
+ public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) {
+ }
+
+ @Override
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
+ BufferedReader reader;
+
+ @Override
+ protected InputStream inputOpened(final InputStream input) throws IOException {
+ reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
+ return input;
+ }
+
+ @Override
+ public void doClose() {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.error("Error closing reader: {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean doAdvance(final Consumer super SchemaAndValue> action) {
+ String line = null;
+ try {
+ // remove blank and empty lines.
+ while (StringUtils.isBlank(line)) {
+ line = reader.readLine();
+ if (line == null) {
+ // end of file
+ return false;
+ }
+ }
+ line = line.trim();
+ action.accept(jsonConverter.toConnectData(topic, line.getBytes(StandardCharsets.UTF_8)));
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Error reading input stream: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
+ final AbstractConfig sourceConfig) {
+ return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
new file mode 100644
index 000000000..2c47d5103
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile;
+
+import io.confluent.connect.avro.AvroData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.io.function.IOSupplier;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParquetTransformer extends Transformer {
+
+ private final AvroData avroData;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParquetTransformer.class);
+
+ ParquetTransformer(final AvroData avroData) {
+ super();
+ this.avroData = avroData;
+ }
+
+ @Override
+ public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) {
+ config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
+ }
+
+ @Override
+ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
+ final AbstractConfig sourceConfig) {
+ return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
+
+ private ParquetReader reader;
+ private File parquetFile;
+
+ @Override
+ protected InputStream inputOpened(final InputStream input) throws IOException {
+ final String timestamp = String.valueOf(Instant.now().toEpochMilli());
+
+ try {
+ // Create a temporary file for the Parquet data
+ parquetFile = File.createTempFile(topic + "_" + topicPartition + "_" + timestamp, ".parquet");
+ } catch (IOException e) {
+ LOGGER.error("Error creating temp file for Parquet data: {}", e.getMessage(), e);
+ throw e;
+ }
+
+ try (OutputStream outputStream = Files.newOutputStream(parquetFile.toPath())) {
+ IOUtils.copy(input, outputStream); // Copy input stream to temporary file
+ }
+ reader = AvroParquetReader.builder(new LocalInputFile(parquetFile.toPath())).build();
+ return input;
+ }
+
+ @Override
+ protected void doClose() {
+ if (reader != null) {
+ try {
+ reader.close(); // Close reader at end of file
+ } catch (IOException e) {
+ logger.error("Error closing reader: {}", e.getMessage(), e);
+ }
+ }
+ if (parquetFile != null) {
+ deleteTmpFile(parquetFile.toPath());
+ }
+ }
+
+ @Override
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
+ try {
+ final GenericRecord record = reader.read();
+ if (record != null) {
+ action.accept(avroData.toConnectData(record.getSchema(), record)); // Pass record to the stream
+ return true;
+ }
+ } catch (IOException e) {
+ logger.error("Error reading record: {}", e.getMessage(), e);
+ }
+ return false;
+ }
+ };
+ }
+
+ static void deleteTmpFile(final Path parquetFile) {
+ if (Files.exists(parquetFile)) {
+ try {
+ Files.delete(parquetFile);
+ } catch (IOException e) {
+ LOGGER.error("Error in deleting tmp file {}", e.getMessage(), e);
+ }
+ }
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
new file mode 100644
index 000000000..09e8c0ca5
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import org.apache.commons.io.function.IOSupplier;
+import org.slf4j.Logger;
+
+public abstract class Transformer {
+
+ public abstract void configureValueConverter(Map config, AbstractConfig sourceConfig);
+
+ public final Stream getRecords(final IOSupplier inputStreamIOSupplier,
+ final String topic, final int topicPartition, final AbstractConfig sourceConfig, final long skipRecords) {
+
+ final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition,
+ sourceConfig);
+ return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords);
+ }
+
+ /**
+ * Creates the stream spliterator for this transformer.
+ *
+ * @param inputStreamIOSupplier
+ * the input stream supplier.
+ * @param topic
+ * the topic.
+ * @param topicPartition
+ * the partition.
+ * @param sourceConfig
+ * the source configuraiton.
+ * @return a StreamSpliterator instance.
+ */
+ protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic,
+ int topicPartition, AbstractConfig sourceConfig);
+
+ public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, AbstractConfig sourceConfig);
+
+ /**
+ * A Spliterator that performs various checks on the opening/closing of the input stream.
+ */
+ protected abstract static class StreamSpliterator implements Spliterator {
+ /**
+ * The input stream supplier.
+ */
+ private final IOSupplier inputStreamIOSupplier;
+ /**
+ * The logger to be used by all instances of this class. This will be the Transformer logger.
+ */
+ protected final Logger logger;
+ /**
+ * The input stream. Will be null until {@link #inputOpened} has completed. May be used for reading but should
+ * not be closed or otherwise made unreadable.
+ */
+ protected InputStream inputStream;
+
+ /**
+ * A flag indicate that the input stream has been closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ *
+ * @param logger
+ * The logger for this Spliterator to use.
+ * @param inputStreamIOSupplier
+ * the InputStream supplier
+ */
+ protected StreamSpliterator(final Logger logger, final IOSupplier inputStreamIOSupplier) {
+ this.logger = logger;
+ this.inputStreamIOSupplier = inputStreamIOSupplier;
+ }
+
+ /**
+ * Attempt to read the next record. If there is no record to read or an error occurred return false. If a record
+ * was created, call {@code action.accept()} with the record.
+ *
+ * @param action
+ * the Consumer to call if record is created.
+ * @return {@code true} if a record was processed, {@code false} otherwise.
+ */
+ abstract protected boolean doAdvance(Consumer super SchemaAndValue> action);
+
+ /**
+ * Method to close additional inputs if needed.
+ */
+ abstract protected void doClose();
+
+ public final void close() {
+ doClose();
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null; // NOPMD setting null to release resources
+ closed = true;
+ }
+ } catch (IOException e) {
+ logger.error("Error trying to close inputStream: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Allows modification of input stream. Called immediatly after the input stream is opened. Implementations may
+ * modify the type of input stream by wrapping it with a specific implementation, or may create Readers from the
+ * input stream. The modified input stream must be returned. If a Reader or similar class is created from the
+ * input stream the input stream must be returned.
+ *
+ * @param input
+ * the input stream that was just opened.
+ * @return the input stream or modified input stream.
+ * @throws IOException
+ * on IO error.
+ */
+ abstract protected InputStream inputOpened(InputStream input) throws IOException;
+
+ @Override
+ public final boolean tryAdvance(final Consumer super SchemaAndValue> action) {
+ if (closed) {
+ return false;
+ }
+ boolean result = false;
+ try {
+ if (inputStream == null) {
+ try {
+ inputStream = inputStreamIOSupplier.get();
+ inputOpened(inputStream);
+ } catch (IOException e) {
+ logger.error("Error trying to open inputStream: {}", e.getMessage(), e);
+ close();
+ return false;
+ }
+ }
+ result = doAdvance(action);
+ } catch (RuntimeException e) { // NOPMD must catch runtime exception here.
+ logger.error("Error trying to advance data: {}", e.getMessage(), e);
+ }
+ if (!result) {
+ close();
+ }
+ return result;
+ }
+
+ @Override
+ public final Spliterator trySplit() { // NOPMD returning null is reqruied by API
+ return null;
+ }
+
+ @Override
+ public long estimateSize() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public int characteristics() {
+ return Spliterator.ORDERED | Spliterator.NONNULL;
+ }
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java
new file mode 100644
index 000000000..574604306
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.json.JsonConverter;
+
+import io.confluent.connect.avro.AvroData;
+
+/**
+ * A factory to create Transformers.
+ */
+public final class TransformerFactory {
+ /** The cache size for systems that read Avro data */
+ public static final int CACHE_SIZE = 100;
+
+ private TransformerFactory() {
+ // hidden
+ }
+
+ /**
+ * Gets a configured Transformer.
+ *
+ * @param inputFormat
+ * The input format for the transformer.
+ * @return the Transformer for the specified input format.
+ */
+ public static Transformer getTransformer(final InputFormat inputFormat) {
+ switch (inputFormat) {
+ case AVRO :
+ return new AvroTransformer(new AvroData(CACHE_SIZE));
+ case PARQUET :
+ return new ParquetTransformer(new AvroData(CACHE_SIZE));
+ case JSONL :
+ final JsonConverter jsonConverter = new JsonConverter();
+ jsonConverter.configure(Map.of(SCHEMAS_ENABLE, "false"), false);
+ return new JsonTransformer(jsonConverter);
+ case BYTES :
+ return new ByteArrayTransformer();
+ default :
+ throw new IllegalArgumentException("Unknown input format in configuration: " + inputFormat);
+ }
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/parquet/LocalInputFile.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/parquet/LocalInputFile.java
new file mode 100644
index 000000000..bb1081ab2
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/parquet/LocalInputFile.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input.parquet;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * {@code LocalInputFile} is an implementation needed by Parquet to read from local data files using
+ * {@link SeekableInputStream} instances.
+ */
+public class LocalInputFile implements InputFile {
+
+ private final Path path;
+ private long length = -1;
+
+ public LocalInputFile(final Path file) {
+ path = file;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ if (length == -1) {
+ try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
+ length = file.length();
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+
+ return new SeekableInputStream() {
+
+ private final RandomAccessFile randomAccessFile = new RandomAccessFile(path.toFile(), "r");
+
+ @Override
+ public int read() throws IOException {
+ return randomAccessFile.read();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return randomAccessFile.getFilePointer();
+ }
+
+ @Override
+ public void seek(final long newPos) throws IOException {
+ randomAccessFile.seek(newPos);
+ }
+
+ @Override
+ public void readFully(final byte[] bytes) throws IOException {
+ randomAccessFile.readFully(bytes);
+ }
+
+ @Override
+ public void readFully(final byte[] bytes, final int start, final int len) throws IOException {
+ randomAccessFile.readFully(bytes, start, len);
+ }
+
+ @Override
+ public int read(final ByteBuffer buf) throws IOException {
+ final byte[] buffer = new byte[buf.remaining()];
+ final int code = read(buffer);
+ buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+ return code;
+ }
+
+ @Override
+ public void readFully(final ByteBuffer buf) throws IOException {
+ final byte[] buffer = new byte[buf.remaining()];
+ readFully(buffer);
+ buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
+ }
+
+ @Override
+ public void close() throws IOException {
+ randomAccessFile.close();
+ }
+ };
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
new file mode 100644
index 000000000..3f78431ea
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2025 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input.utils;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import io.aiven.kafka.connect.common.source.task.Context;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FilePatternUtils allows the construction of a regex pattern to extract the
+ * {@link io.aiven.kafka.connect.common.source.task.Context Context} from an Object Key.
+ *
+ */
+public final class FilePatternUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class);
+ public static final String PATTERN_PARTITION_KEY = "partition";
+ public static final String PATTERN_TOPIC_KEY = "topic";
+ public static final String PATTERN_START_OFFSET_KEY = "startOffset"; // no undercore allowed as it breaks the regex.
+ public static final String START_OFFSET_PATTERN = "{{start_offset}}";
+ public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
+ public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
+ public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";
+
+ // Use a named group to return the partition in a complex string to always get the correct information for the
+ // partition number.
+ public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
+ public static final String START_OFFSET_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_START_OFFSET_KEY + ">\\d+)";
+ public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
+ public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
+ public static final String START_OFFSET = "Start offset";
+
+ final Pattern pattern;
+ private final boolean startOffsetConfigured;
+ private final boolean partitionConfigured;
+ private final boolean topicConfigured;
+
+ /**
+ * Creates an instance of FilePatternUtils, this constructor is used to configure the Pattern that is used to
+ * extract Context from Object 'K'.
+ *
+ * @param pattern
+ */
+ public FilePatternUtils(final String pattern) {
+ this.pattern = configurePattern(pattern);
+ startOffsetConfigured = pattern.contains(START_OFFSET_PATTERN);
+ partitionConfigured = pattern.contains(PARTITION_PATTERN);
+ topicConfigured = pattern.contains(TOPIC_PATTERN);
+ }
+
+ /**
+ * Sets a Regex Pattern based on initial configuration that allows group regex to be used to extract information
+ * from the toString() of Object K which is passed in for Context extraction.
+ *
+ * @param expectedSourceNameFormat
+ * This is a string in the expected compatible format which will allow object name or keys to have unique
+ * information such as partition number, topic name, offset and timestamp information.
+ * @return A pattern which is configured to allow extraction of the key information from object names and keys.
+ */
+ private Pattern configurePattern(final String expectedSourceNameFormat) {
+ if (expectedSourceNameFormat == null) {
+ throw new ConfigException(
+ "Source name format is missing please configure the expected source to include the partition pattern.");
+ }
+
+ // Build REGEX Matcher
+ String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN,
+ START_OFFSET_NAMED_GROUP_REGEX_PATTERN);
+ regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
+ regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
+ regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
+ try {
+ return Pattern.compile(regexString);
+ } catch (IllegalArgumentException iae) {
+ throw new ConfigException(
+ String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
+ iae);
+ }
+ }
+
+ public > Optional> process(final K sourceName) {
+ final Optional matcher = fileMatches(sourceName.toString());
+ if (matcher.isPresent()) {
+ final Context ctx = new Context<>(sourceName);
+ getTopic(matcher.get(), sourceName.toString()).ifPresent(ctx::setTopic);
+ getPartitionId(matcher.get(), sourceName.toString()).ifPresent(ctx::setPartition);
+ getOffset(matcher.get(), sourceName.toString()).ifPresent(ctx::setOffset);
+ return Optional.of(ctx);
+ }
+ return Optional.empty();
+
+ }
+
+ private Optional fileMatches(final String sourceName) {
+ return matchPattern(sourceName);
+ }
+
+ private Optional getTopic(final Matcher matcher, final String sourceName) {
+
+ try {
+ return Optional.of(matcher.group(PATTERN_TOPIC_KEY));
+ } catch (IllegalArgumentException ex) {
+ // It is possible that when checking for the group it does not match and returns an
+ // illegalArgumentException
+ if (topicConfigured) {
+ LOGGER.warn("Unable to extract Topic from {} and 'topics' not configured.", sourceName);
+ }
+ return Optional.empty();
+ }
+
+ }
+
+ private Optional getPartitionId(final Matcher matcher, final String sourceName) {
+ try {
+ return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
+ } catch (IllegalArgumentException e) {
+ // It is possible that when checking for the group it does not match and returns an
+ // illegalStateException, Number format exception is also covered by this in this case.
+ if (partitionConfigured) {
+ LOGGER.warn("Unable to extract Partition id from {}.", sourceName);
+ }
+ return Optional.empty();
+ }
+
+ }
+
+ private Optional getOffset(final Matcher matcher, final String sourceName) {
+ try {
+ return Optional.of(Integer.parseInt(matcher.group(PATTERN_START_OFFSET_KEY)));
+ } catch (IllegalArgumentException e) {
+ // It is possible that when checking for the group it does not match and returns an
+ // illegalStateException, Number format exception is also covered by this in this case.
+ if (startOffsetConfigured) {
+ LOGGER.warn("Unable to extract start offset from {}.", sourceName);
+ }
+ return Optional.empty();
+ }
+
+ }
+
+ private Optional matchPattern(final String sourceName) {
+ if (sourceName == null) {
+ throw new IllegalArgumentException("filePattern and sourceName must not be null");
+ }
+ final Matcher matcher = pattern.matcher(sourceName);
+ return matcher.find() ? Optional.of(matcher) : Optional.empty();
+ }
+
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java
new file mode 100644
index 000000000..265ade6db
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2025 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.task;
+
+import java.util.Optional;
+
+/**
+ * A Context which captures all the details about the source which are required to successfully send a source record
+ * onto Kafka
+ *
+ * @param
+ * is is the type/class of the key unique to the object the context is being created about
+ */
+public class Context> {
+
+ private String topic;
+ private Integer partition;
+ private Integer offset;
+ private K storageKey;
+
+ public Context(final K storageKey) {
+
+ this.storageKey = storageKey;
+ }
+
+ public Optional getTopic() {
+ return Optional.ofNullable(topic);
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ public Optional getPartition() {
+ return Optional.ofNullable(partition);
+ }
+
+ public void setPartition(final Integer partition) {
+ this.partition = partition;
+ }
+
+ public Optional getStorageKey() {
+ return Optional.ofNullable(storageKey);
+ }
+
+ public void setStorageKey(final K storageKey) {
+ this.storageKey = storageKey;
+ }
+
+ public Optional getOffset() {
+ return Optional.ofNullable(offset);
+ }
+
+ public void setOffset(final Integer offset) {
+ this.offset = offset;
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java
new file mode 100644
index 000000000..8644889c0
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.task;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
+ * into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
+ *
+ * The number of objects in cloud storage can be very high, selecting a distribution strategy allows the connector to
+ * know how to distribute the load across Connector tasks and in some cases using an appropriate strategy can also
+ * decide on maintaining a level of ordering between messages as well.
+ */
+public final class DistributionStrategy {
+ private int maxTasks;
+ private final Function, Optional> mutation;
+ public final static int UNDEFINED = -1;
+
+ public DistributionStrategy(final Function, Optional> creator, final int maxTasks) {
+ assertPositiveInteger(maxTasks);
+ this.mutation = creator;
+ this.maxTasks = maxTasks;
+ }
+
+ private static void assertPositiveInteger(final int sourceInt) {
+ if (sourceInt <= 0) {
+ throw new IllegalArgumentException("tasks.max must be set to a positive number and at least 1.");
+ }
+ }
+
+ /**
+ * Retrieve the taskId that this object should be processed by. Any single object will be assigned deterministically
+ * to a single taskId, that will be always return the same taskId output given the same context is used.
+ *
+ * @param ctx
+ * This is the context which contains optional values for the partition, topic and storage key name
+ * @return the taskId which this particular task should be assigned to.
+ */
+ public int getTaskFor(final Context> ctx) {
+ return mutation.apply(ctx).map(aLong -> Math.floorMod(aLong, maxTasks)).orElse(UNDEFINED);
+ }
+
+ /**
+ * When a connector receives a reconfigure event this method should be called to ensure that the distribution
+ * strategy is updated correctly.
+ *
+ * @param maxTasks
+ * The maximum number of tasks created for the Connector
+ */
+ public void configureDistributionStrategy(final int maxTasks) {
+ assertPositiveInteger(maxTasks);
+ this.maxTasks = maxTasks;
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionType.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionType.java
new file mode 100644
index 000000000..9010e8b8d
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionType.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2025 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.task;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.kafka.common.config.ConfigException;
+
+public enum DistributionType {
+
+ /**
+ * Object_Hash takes the context and uses the storage key implementation to get a hash value of the storage key and
+ * return a modulus of that relative to the number of maxTasks to decide which task should process a given object
+ */
+ OBJECT_HASH("object_hash",
+ context -> context.getStorageKey().isPresent()
+ ? Optional.of((long) context.getStorageKey().get().hashCode())
+ : Optional.empty()),
+ /**
+ * Partition takes the context and requires the context contain the partition id for it to be able to decide the
+ * distribution across the max tasks, using a modulus to ensure even distribution against the configured max tasks
+ */
+ PARTITION("partition",
+ context -> context.getPartition().isPresent()
+ ? Optional.of((long) context.getPartition().get())
+ : Optional.empty());
+
+ private final String name;
+ private final Function, Optional> mutation;
+
+ public String value() {
+ return name;
+ }
+
+ /**
+ * Get the Object distribution strategy for the configured ObjectDistributionStrategy
+ *
+ * @param name
+ * the name of the ObjectDistributionStrategy
+ * @param mutation
+ * the mutation required to get the correct details from the context for distribution
+ */
+ DistributionType(final String name, final Function, Optional> mutation) {
+ this.name = name;
+ this.mutation = mutation;
+ }
+
+ public static DistributionType forName(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
+ for (final DistributionType distributionType : DistributionType.values()) {
+ if (distributionType.name.equalsIgnoreCase(name)) {
+ return distributionType;
+ }
+ }
+ throw new ConfigException(String.format("Unknown distribution.type : %s, allowed values %s ", name,
+ Arrays.toString(DistributionType.values())));
+ }
+
+ /**
+ * Returns a configured Distribution Strategy
+ *
+ * @param maxTasks
+ * the maximum number of configured tasks for this connector
+ *
+ * @return a configured Distribution Strategy with the correct mutation configured for proper distribution across
+ * tasks of objects being processed.
+ */
+ public DistributionStrategy getDistributionStrategy(final int maxTasks) {
+ return new DistributionStrategy(mutation, maxTasks);
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java
deleted file mode 100644
index c39676ad0..000000000
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2024 Aiven Oy
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.kafka.connect.common.source.task;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
- * the object's filename, which is uniformly distributed and deterministic across workers.
- *
- * This is well-suited to use cases where the order of events between records from objects is not important, especially
- * when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
- */
-public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
- private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
- private int maxTasks;
- HashObjectDistributionStrategy(final int maxTasks) {
- this.maxTasks = maxTasks;
- }
-
- @Override
- public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) {
- if (filenameToBeEvaluated == null) {
- LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
- return false;
- }
- final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks);
- // floor mod returns the remainder of a division so will start at 0 and move up
- // tasks start at 0 so there should be no issue.
- return taskAssignment == taskId;
- }
-
- @Override
- public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
- setMaxTasks(maxTasks);
- }
-
- public void setMaxTasks(final int maxTasks) {
- this.maxTasks = maxTasks;
- }
-}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java
deleted file mode 100644
index 5925d880d..000000000
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2024 Aiven Oy
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.kafka.connect.common.source.task;
-
-/**
- * An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or
- * files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
- *
- * The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
- * overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
- * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
- * workers than tasks, and they will be assigned the remaining tasks as work completes.
- */
-public interface ObjectDistributionStrategy {
-
- /**
- * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
- * assigned deterministically to a single taskId.
- *
- * @param taskId
- * a task ID, usually for the currently running task
- * @param valueToBeEvaluated
- * The value to be evaluated to determine if it should be processed by the task.
- * @return true if the task should process the object, false if it should not.
- */
- boolean isPartOfTask(int taskId, String valueToBeEvaluated);
-
- /**
- * When a connector receives a reconfigure event this method should be called to ensure that the distribution
- * strategy is updated correctly.
- *
- * @param maxTasks
- * The maximum number of tasks created for the Connector
- * @param expectedFormat
- * The expected format, of files, path, table names or other ways to partition the tasks.
- */
- void reconfigureDistributionStrategy(int maxTasks, String expectedFormat);
-
- /**
- * Check if the task is responsible for this set of files by checking if the given task matches the partition id.
- *
- * @param taskId
- * the current running task
- * @param partitionId
- * The partitionId recovered from the file path.
- * @return true if this task is responsible for this partition. false if it is not responsible for this task.
- */
- default boolean taskMatchesPartition(final int taskId, final int partitionId) {
- // The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
- // will break.
- return taskId == partitionId;
- }
-
- /**
- * In the event of more partitions existing then tasks configured, the task will be required to take up additional
- * tasks that match.
- *
- * @param taskId
- * the current running task.
- * @param maxTasks
- * The maximum number of configured tasks allowed to run for this connector.
- * @param partitionId
- * The partitionId recovered from the file path.
- * @return true if the task supplied should handle the supplied partition
- */
- default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {
-
- return taskMatchesPartition(taskId, partitionId % maxTasks);
- }
-
- default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) {
- return partitionId < maxTasks
- ? taskMatchesPartition(taskId, partitionId)
- : taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionId);
-
- }
-}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java
deleted file mode 100644
index f74e56826..000000000
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2024 Aiven Oy
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.kafka.connect.common.source.task;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kafka.common.config.ConfigException;
-
-import org.codehaus.plexus.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The {@link PartitionInFilenameDistributionStrategy} finds a partition in the object's filename by matching it to an
- * expected format, and assigns all partitions to the same task.
- *
- * This useful when a sink connector has created the object name in a format like
- * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed
- * within a single task.
- */
-public final class PartitionInFilenameDistributionStrategy implements ObjectDistributionStrategy {
- private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class);
- private final static String NUMBER_REGEX_PATTERN = "(\\d)+";
- // Use a named group to return the partition in a complex string to always get the correct information for the
- // partition number.
- private final static String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?\\d)+";
- private final static String PARTITION_PATTERN = "\\{\\{partition}}";
- private final static String START_OFFSET_PATTERN = "\\{\\{start_offset}}";
- private final static String TIMESTAMP_PATTERN = "\\{\\{timestamp}}";
- public static final String PARTITION = "partition";
- private Pattern partitionPattern;
-
- private int maxTasks;
-
- PartitionInFilenameDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
- configureDistributionStrategy(maxTasks, expectedSourceNameFormat);
- }
-
- /**
- *
- * @param sourceNameToBeEvaluated
- * is the filename/table name of the source for the connector.
- * @return Predicate to confirm if the given source name matches
- */
- @Override
- public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated) {
- if (sourceNameToBeEvaluated == null) {
- LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
- return false;
- }
- final Matcher match = partitionPattern.matcher(sourceNameToBeEvaluated);
- if (match.find()) {
- return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(match.group(PARTITION)));
- }
- LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
- return false;
- }
-
- /**
- * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is
- * being implemented by the connector.
- *
- * @param maxTasks
- * maximum number of configured tasks for this connector
- * @param expectedSourceNameFormat
- * what the format of the source should appear like so to configure the task distribution.
- */
- @Override
- public void reconfigureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
- configureDistributionStrategy(maxTasks, expectedSourceNameFormat);
- }
-
- private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
- if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
- throw new ConfigException(String.format(
- "Source name format %s missing partition pattern {{partition}}, please configure the expected source to include the partition pattern.",
- expectedSourceNameFormat));
- }
- setMaxTasks(maxTasks);
- // Build REGEX Matcher
- String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
- regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
- regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
- try {
- partitionPattern = Pattern.compile(regexString);
- } catch (IllegalArgumentException iae) {
- throw new ConfigException(
- String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
- iae);
- }
- }
-
- private void setMaxTasks(final int maxTasks) {
- this.maxTasks = maxTasks;
- }
-
-}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java
deleted file mode 100644
index 85e1c3e75..000000000
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright 2024 Aiven Oy
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.kafka.connect.common.source.task;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.errors.ConnectException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The {@link PartitionInPathDistributionStrategy} finds a partition number in the path by matching a
- * {@code {{partition}} } marker in the path.
- *
- * This useful when a sink connector has created the object name in a path like
- * {@code /PREFIX/partition={{partition}}/YYYY/MM/DD/mm/}}, and we want all objects with the same partition to be
- * processed within a single task.
- *
- * Partitions are evenly distributed between tasks. For example, in Connect with 10 Partitions and 3 tasks:
- *
- *
- */
-public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy {
- public static final String PARTITION_ID_PATTERN = "\\{\\{partition}}";
- private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class);
-
- private String prefix;
- private int maxTasks;
-
- PartitionInPathDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
- configureDistributionStrategy(maxTasks, expectedPathFormat);
- }
-
- @Override
- public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) {
- if (pathToBeEvaluated == null || !pathToBeEvaluated.startsWith(prefix)) {
- LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup",
- pathToBeEvaluated, prefix);
- return false;
- }
- final String modifiedPath = StringUtils.substringAfter(pathToBeEvaluated, prefix);
- if (!modifiedPath.contains("/")) {
- LOG.warn("Ignoring path {}, does not contain any sub folders after partitionId prefix {}",
- pathToBeEvaluated, prefix);
- return false;
- }
- final String partitionId = StringUtils.substringBefore(modifiedPath, "/");
-
- try {
- return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(partitionId));
- } catch (NumberFormatException ex) {
- throw new ConnectException(String
- .format("Unexpected non integer value found parsing path for partitionId: %s", pathToBeEvaluated));
- }
- }
-
- /**
- *
- * @param maxTasks
- * The maximum number of configured tasks for this
- * @param expectedPathFormat
- * The format of the path and where to identify
- */
- @Override
- public void reconfigureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
- configureDistributionStrategy(maxTasks, expectedPathFormat);
- }
-
- private void configureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
- setMaxTasks(maxTasks);
-
- if (StringUtils.isEmpty(expectedPathFormat) || !expectedPathFormat.contains(PARTITION_ID_PATTERN)) {
- throw new ConfigException(String.format(
- "Expected path format %s is missing the identifier '%s' to correctly select the partition",
- expectedPathFormat, PARTITION_ID_PATTERN));
- }
- prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_ID_PATTERN);
- }
-
- private void setMaxTasks(final int maxTasks) {
- this.maxTasks = maxTasks;
- }
-
-}
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java
new file mode 100644
index 000000000..92fbddf46
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.junit.jupiter.api.Test;
+
+class AbstractSourceTaskTest {
+
+ /**
+ * The amount of extra time that we will allow for timing errors.
+ */
+ private static final long TIMING_DELTA_MS = 250;
+
+ @Test
+ void timerTest() {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ assertThat(timer.millisecondsRemaining()).isEqualTo(Duration.ofSeconds(1).toMillis());
+ timer.start();
+ await().atMost(Duration.ofSeconds(2)).until(timer::isExpired);
+ assertThat(timer.millisecondsRemaining()).isLessThan(0);
+ timer.stop();
+ assertThat(timer.millisecondsRemaining()).isEqualTo(Duration.ofSeconds(1).toMillis());
+ }
+
+ @Test
+ void timerSequenceTest() {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ // stopped state does not allow stop
+ assertThatExceptionOfType(IllegalStateException.class).as("stop while not running")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ timer.reset(); // verify that an exception is not thrown.
+
+ // started state does not allow start
+ timer.start();
+ assertThatExceptionOfType(IllegalStateException.class).as("start while running")
+ .isThrownBy(timer::start)
+ .withMessageStartingWith("Timer: ");
+ timer.reset();
+ timer.start(); // restart the timer.
+ timer.stop();
+
+ // stopped state does not allow stop or start
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after stop")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ assertThatExceptionOfType(IllegalStateException.class).as("start after stop")
+ .isThrownBy(timer::start)
+ .withMessageStartingWith("Timer: ");
+ timer.reset();
+
+ // stopped + reset does not allow stop.
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after reset (1)")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ timer.start();
+ timer.reset();
+
+ // started + reset does not allow stop;
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after reset (2)")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ }
+
+ @Test
+ void backoffTest() throws InterruptedException {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ final AbstractSourceTask.Backoff backoff = new AbstractSourceTask.Backoff(timer.getBackoffConfig());
+ final long estimatedDelay = backoff.estimatedDelay();
+ assertThat(estimatedDelay).isLessThan(500);
+
+ // execute delay without timer running.
+ final StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ backoff.delay();
+ stopWatch.stop();
+ assertThat(stopWatch.getTime()).as("Result without timer running")
+ .isBetween(estimatedDelay - backoff.getMaxJitter() - TIMING_DELTA_MS,
+ estimatedDelay + backoff.getMaxJitter() + TIMING_DELTA_MS);
+
+ timer.start();
+ for (int i = 0; i < 9; i++) {
+ stopWatch.reset();
+ timer.reset();
+ timer.start();
+ stopWatch.start();
+ await().atMost(Duration.ofSeconds(2)).until(() -> {
+ backoff.delay();
+ return backoff.estimatedDelay() == 0 || timer.isExpired();
+ });
+ stopWatch.stop();
+ timer.stop();
+ final int step = i;
+ if (!timer.isExpired()) {
+ assertThat(stopWatch.getTime()).as(() -> String.format("Result with timer running at step %s", step))
+ .isBetween(Duration.ofSeconds(1).toMillis() - backoff.getMaxJitter() - TIMING_DELTA_MS,
+ Duration.ofSeconds(1).toMillis() + backoff.getMaxJitter() + TIMING_DELTA_MS);
+ }
+ }
+ }
+
+ @Test
+ void backoffIncrementalTimeTest() throws InterruptedException {
+ final AtomicBoolean abortTrigger = new AtomicBoolean();
+ // delay increases in powers of 2.
+ final long maxDelay = 1000; // not a power of 2
+ final AbstractSourceTask.BackoffConfig config = new AbstractSourceTask.BackoffConfig() {
+ @Override
+ public AbstractSourceTask.SupplierOfLong getSupplierOfTimeRemaining() {
+ return () -> maxDelay;
+ }
+
+ @Override
+ public AbstractSourceTask.AbortTrigger getAbortTrigger() {
+ return () -> abortTrigger.set(true);
+ }
+ };
+
+ final AbstractSourceTask.Backoff backoff = new AbstractSourceTask.Backoff(config);
+ long expected = 2;
+ while (backoff.estimatedDelay() < maxDelay) {
+ assertThat(backoff.estimatedDelay()).isEqualTo(expected);
+ backoff.delay();
+ expected *= 2;
+ }
+ assertThat(backoff.estimatedDelay()).isEqualTo(maxDelay);
+ assertThat(abortTrigger).isFalse();
+ }
+}
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
new file mode 100644
index 000000000..617dd290a
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source.input;
+
+import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+
+import io.aiven.kafka.connect.common.config.SourceCommonConfig;
+
+import io.confluent.connect.avro.AvroData;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+final class AvroTransformerTest {
+
+ @Mock
+ private SourceCommonConfig sourceCommonConfig;
+
+ private AvroTransformer avroTransformer;
+ private Map config;
+
+ @BeforeEach
+ void setUp() {
+ avroTransformer = new AvroTransformer(new AvroData(100));
+ config = new HashMap<>();
+ }
+
+ @Test
+ void testConfigureValueConverter() {
+ final String value = "http://localhost:8081";
+ when(sourceCommonConfig.getString(SCHEMA_REGISTRY_URL)).thenReturn(value);
+ avroTransformer.configureValueConverter(config, sourceCommonConfig);
+ assertThat(config.get(SCHEMA_REGISTRY_URL)).isEqualTo("http://localhost:8081")
+ .describedAs("The schema registry URL should be correctly set in the config.");
+ }
+
+ @Test
+ void testReadAvroRecordsInvalidData() {
+ final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8));
+
+ final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
+ 0);
+
+ final List