Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks assignment strategy - commons integration - [KCON-63] #384

Open
wants to merge 4 commits into
base: s3-source-release
Choose a base branch
from

Conversation

muralibasani
Copy link
Contributor

@muralibasani muralibasani commented Jan 6, 2025

[KCON-63]

  • Integrate Task assignment strategies of common module into s3 release feature branch
  • Delete hard coding of file pattern from s3 iterator class
  • Update existing tests
  • Added new integration tests to verify other strategy use cases

@muralibasani muralibasani changed the title Kcon63 tasks strategy Kcon63 tasks strategy - [KCON-63] Jan 7, 2025
@aindriu-aiven aindriu-aiven mentioned this pull request Jan 7, 2025
@muralibasani muralibasani changed the title Kcon63 tasks strategy - [KCON-63] Tasks assignment strategy - commons integration - [KCON-63] Jan 7, 2025
@muralibasani muralibasani marked this pull request as ready for review January 7, 2025 08:35
@muralibasani muralibasani requested review from a team as code owners January 7, 2025 08:35
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;

import org.codehaus.plexus.util.StringUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @muralibasani I know you didn't add it here but can you update this import to use
'import org.apache.commons.lang3.StringUtils;' instead of plexus which we are pulling in from the kafka library (and later versions dont include)

"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH,
GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding SourcePollingConfigCounter++ here means we should remove the //NOPMD from line 66 also can you add the comment // Unused Assignment as an explanation for this here please? (removing it from line 67)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an incorrect usage of counter. Updating it to use offsetStorageGroupCounter.

@@ -92,6 +105,10 @@ public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
}

public String getObjectDistributionStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getObjectDistributionStrategy() {
public ObjectDistributionStrategies getObjectDistributionStrategy() {
return ObjectDistributionStrategies.forName(sourceConfigFragment.getObjectDistributionStrategy());

The same is being done for ErrorsTolerance in another PR so we should keep this consistent.

Also I am not sure should it be called ObjectDistributionStrategies or ObjectDistributionStrategy as the enum will return one strategy ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought so, but ObjectDistributionStrategy already exists as an interface in commons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this to return ObjectDistributionStrategy instead of String?


@Override
public Pattern getFilePattern() {
return filePattern;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the HashObjectDistributionStrategy does not use the filePattern I think getFilePattern here should throw a NotImplementedExcepetion() as it is unexpected that it ever gets called here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for object hash strategy, pattern is used in iterator to extract topic and partition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extraction of topic and partition should not depend on the distribution strategy going forward.

They are separate concerns and should be implemented as such.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Pattern configuring is moved to source task.


private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filePattern should not need to be set as it is unused here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern is required in this hash object strategy too, pls check source iterator class.

* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this configure Pattern it is really strategy implementation and shouldn't be added here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern is must for all the strategies on source connectors. Based on this,

  • extract topic and partition in source iterator
  • task assignments
    are done.
    If pattern is not done here, we would have to duplicate the whole piece of code again to create pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, moved.

private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class);

private String prefix;
private String s3Prefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is commons code so we shouldn't have any reference specifically to S3 here.

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be just passing in the Strategy as a predicate here, and if it matches that predicate then, in the future as we add features, predicates can be chained making it really easy to extend this quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy has a getTaskFor(String) method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want a new method in AWSV2SourceClient called addPredicate(Predicate<S3Object> newPredicate) that will do

this.filterPredicate = this.filterPredicate.and(newPredicate)

@@ -97,13 +97,14 @@ public void start(final Map<String, String> props) {
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
awsv2SourceClient.initializeObjectDistributionStrategy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should initialize the ObjctDistributionStrategy here in S3SourceTask and add it as a predicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially started with it, but stuck with some issue and couldn't back to this. thx

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks like a good start. Getting it settled into the streaming strategy will take a bit of work.

@@ -92,6 +105,10 @@ public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
}

public String getObjectDistributionStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.


@Override
public Pattern getFilePattern() {
return filePattern;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extraction of topic and partition should not depend on the distribution strategy going forward.

They are separate concerns and should be implemented as such.

String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
String START_OFFSET_PATTERN = "{{start_offset}}";
String TIMESTAMP_PATTERN = "{{timestamp}}";
String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files based distribution strategies will need to use these as will any file base topic and partition based extraction. So how about an interface or static class that contains the file based patterns. Call it something like FileExctractionPatterns

* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the AWS client should be agnostic to how the task assignment works its only real focus should be on talking to AWS, so we should not set the filePattern/taskId/distributionStrategy here.

We should call setFilterPredicate either from the S3SourceTask or the SourceRecordIterator.

final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do need this update to look something like the below and this way we just and additional predicates as we need them.

Suggested change
this.filterPredicate = basePredicate
public void setFilterPredicate(final Predicate<S3Object> predicate) {
this.filterPredicate = this.filterPredicate.and(predicate);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't account for or predicates but we dont have a use case for that yet so I think it should be ok for the moment.

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add in getTaskId and getMaxTasksId in CommonConfig.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.

this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
DistributionStrategy distributionStrategy;

switch (objectDistributionStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually need two separate regex's here.
One for the SourceRecordIterator and one for the Distribution Strategy.

The reason being that the fileNameFragment set in the Partition in filepath for example has an any filename pattern at the end, but users will still want to be able to set the filename template separately to only include certain files in for example they may want to put *.png and only send images or they want files with a certain pattern.

The other thing I can't 100% tell here is the difference between
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()
and
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()

is one of them the default pattern we had previously? Ignore if not but if it is we should always look to get the custom one that has been configured and fall back on the default if not configured (also throw an error if its missing partition etc when we configure PARTITION_IN_FILENAME)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no difference between original template and tostr. Just checked. Added integration tests with default value and non default.

For prefix, introduced a new config for pattern.

This should allow users to provide any patterns on file names or prefixes as long as topic, partition are available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking that for me

@@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) {
// UnusedAssignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: to remove those now we have the one below.

if (currentObjectKey != null) {
if (validateTaskDistributionStrategy(currentObjectKey)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with the new changes this might not be needed anymore as the predicate checks inside of the AWSV2Client as it builds the stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your earlier point was correct to keep task distribution in source task/iterator. as it is not relevant in Aws client. Keeping as is.
We shall also delete the failed objects from aws client sometime, and keeping only listing objects in aws client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree as long as the predicate is created outside of the client and then we pass it in with the set predicate we should be good.

*/

package io.aiven.kafka.connect.common.source.input.utils;
public class FileExtractionPatterns {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a final class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted this class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that DistributionStrategy should be an abstract class that has one instance variable maxTasks
It should have 3 methods:

  • getMaxTasks()
  • getTaskFor(String)
  • configure(int maxTasks)

If we don't put the current task into the class then we can share the class across all instances of task.
We can construct the DistributionStrategy instance in the AivenKafkaCOnnectS3CSourceConnector.

Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor.

In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task.

It might make sens to pass a CommonConfig in the DistributionStrategy constructor so that any implementation can have access to the known set of configuration properties.

It may also make sens for configure to accept the CommonConfig rather than the int maxTasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shall refactor these in next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.

Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy has a getTaskFor(String) method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want a new method in AWSV2SourceClient called addPredicate(Predicate<S3Object> newPredicate) that will do

this.filterPredicate = this.filterPredicate.and(newPredicate)

@muralibasani muralibasani force-pushed the kcon63-tasks-strategy branch from 3c96864 to 6c437ae Compare January 10, 2025 07:01
@muralibasani muralibasani requested a review from Claudenw January 10, 2025 07:09
Comment on lines +51 to +61
public void reconfigureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 methods do exactly the same thing. This is an indication that there should be one method to configure the distribution strategy after construction and that it should take a maxTasks argument. Change reconfigureDistributionStrategy to configureDistributionStrategy in the base class. and just call that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.

Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition in FileName and Partition in Path do exactly the same thing but extract the partition from different places. What we need is a class that will take the file name, extract all the components mentioned in the pattern from it and set them. Then we have one place to do the extraction for all processes that depend upon the values and you only need on PartitionDistributionStrategy. The PartitionDistributionStrategy should not care where the partition comes from it should just use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes elsewhere on partition strategy for background.

This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process.

It should probably have methods to return the values (e.g. getTopic()) that return Optional types. returning Optional.empty() when the type name is not found in the pattern.

final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. I think this will add the same predicates multiple times.

This should literally read:

    public void addFilterPredicate(final Predicate<S3Object> otherPredicate) {
        this.filterPredicate = this.filterPredicate.and(otherPredicate);
    }


// call filters out bad file names and extracts topic/partition
inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectIterator(null),
s3Object -> this.fileNamePredicate.test(s3Object));
s3Object -> this.fileNamePredicate.test(s3Object.key()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file name predicate should be an S3Object predicate (as it was before) and should be moved into the S3ObjectIterator by using the AWSV2SourceClient.addPredicate()
A properly configured chain of predicates will allow us to extract information before the S3Object are returned and the IteratorUitls.filteredIterator call that I put here can be removed.

// return false;
// }

public Predicate<String> predicateForFileAndTaskAssignment() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mix predicates. If we keep them simple then we can put new ones into the chain. If we needed to do something between approving the file as part of what we are interested in and assigning a task we will not be able to do it if the 2 tasks are combined.

Comment on lines +120 to +133
if (!distributionStrategy.isPartOfTask(taskId, objectKey, filePattern)) {
return false;
}

final Matcher fileMatcher = filePattern.matcher(objectKey);

if (fileMatcher.find()) {
// TODO: Decouple topic and partition extraction from S3 specifics
topic = fileMatcher.group(PATTERN_TOPIC_KEY);
partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY));
return true;
}
return false;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the order of these checks were reversed then we could exclude any object that does not match the file filter while extracting the file filter provided information and then use that information in the distribution strategy check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants