Skip to content

Commit

Permalink
Integrate object dist strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 8, 2025
1 parent b4475c9 commit 6157773
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.input.InputFormat;

public class SourceCommonConfig extends CommonConfig {
Expand Down Expand Up @@ -67,6 +68,10 @@ public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy());
}

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.PARTITION_IN_FILEPATH;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;

import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;

public final class SourceConfigFragment extends ConfigFragment {
private static final String GROUP_OTHER = "OTHER_CFG";
Expand All @@ -32,6 +37,8 @@ public final class SourceConfigFragment extends ConfigFragment {
public static final String TARGET_TOPICS = "topics";
public static final String ERRORS_TOLERANCE = "errors.tolerance";

public static final String OBJECT_DISTRIBUTION_STRATEGY = "object.distribution.strategy";

/**
* Construct the ConfigFragment..
*
Expand Down Expand Up @@ -67,7 +74,14 @@ public static ConfigDef update(final ConfigDef configDef) {
ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(),
new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH,
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
// UnusedAssignment

return configDef;
}
Expand All @@ -92,6 +106,10 @@ public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
}

public String getObjectDistributionStrategy() {
return cfg.getString(OBJECT_DISTRIBUTION_STRATEGY);
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
Expand All @@ -103,4 +121,15 @@ public void ensureValid(final String name, final Object value) {
}
}

private static class ObjectDistributionStrategyValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
final String objectDistributionStrategy = (String) value;
if (StringUtils.isNotBlank(objectDistributionStrategy)) {
// This will throw an Exception if not a valid value.
ObjectDistributionStrategy.forName(objectDistributionStrategy);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.enums;

import java.util.Arrays;
import java.util.Objects;

import org.apache.kafka.common.config.ConfigException;

public enum ObjectDistributionStrategy {

OBJECT_HASH("object_hash"), PARTITION_IN_FILENAME("partition_in_filename"), PARTITION_IN_FILEPATH(
"partition_in_filepath");

private final String name;

public String value() {
return name;
}

ObjectDistributionStrategy(final String name) {
this.name = name;
}

public static ObjectDistributionStrategy forName(final String name) {
Objects.requireNonNull(name, "name cannot be null");
for (final ObjectDistributionStrategy objectDistributionStrategy : ObjectDistributionStrategy.values()) {
if (objectDistributionStrategy.name.equalsIgnoreCase(name)) {
return objectDistributionStrategy;
}
}
throw new ConfigException(String.format("Unknown object.distribution.strategy type: %s, allowed values %s ",
name, Arrays.toString(ObjectDistributionStrategy.values())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input.utils;
public class FileExtractionPatterns {
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{" + PATTERN_TOPIC_KEY + "}}/partition={{"
+ PATTERN_PARTITION_KEY + "}}/";
public static final String ANY_FILENAME_PATTERN = ".*$";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input.utils;

import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.NUMBER_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.START_OFFSET_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TIMESTAMP_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_PATTERN;

import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;

public final class FilePatternUtils {

private FilePatternUtils() {
// hidden
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
try {
return Pattern.compile(regexString);
} catch (IllegalArgumentException iae) {
throw new ConfigException(
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
iae);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

/**
* An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or
* files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface ObjectDistributionStrategy {

public interface DistributionStrategy {
/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
Expand All @@ -37,18 +38,16 @@ public interface ObjectDistributionStrategy {
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated);
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);

/**
* When a connector receives a reconfigure event this method should be called to ensure that the distribution
* strategy is updated correctly.
*
* @param maxTasks
* The maximum number of tasks created for the Connector
* @param expectedFormat
* The expected format, of files, path, table names or other ways to partition the tasks.
*/
void reconfigureDistributionStrategy(int maxTasks, String expectedFormat);
void reconfigureDistributionStrategy(int maxTasks);

/**
* Check if the task is responsible for this set of files by checking if the given task matches the partition id.
Expand Down Expand Up @@ -78,14 +77,12 @@ default boolean taskMatchesPartition(final int taskId, final int partitionId) {
* @return true if the task supplied should handle the supplied partition
*/
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {

return taskMatchesPartition(taskId, partitionId % maxTasks);
}

default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) {
return partitionId < maxTasks
? taskMatchesPartition(taskId, partitionId)
: taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionId);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
* the object's filename, which is uniformly distributed and deterministic across workers.
* {@link HashDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of the
* object's filename, which is uniformly distributed and deterministic across workers.
* <p>
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
public final class HashDistributionStrategy implements DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
HashObjectDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
public HashDistributionStrategy(final int maxTasks) {
configureDistributionStrategy(maxTasks);
}

@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) {
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) {
if (filenameToBeEvaluated == null) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
Expand All @@ -46,11 +48,15 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated
}

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
setMaxTasks(maxTasks);
public void reconfigureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Loading

0 comments on commit 6157773

Please sign in to comment.