-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tasks assignment strategy - commons integration - [KCON-63] #384
base: s3-source-release
Are you sure you want to change the base?
Conversation
import org.apache.kafka.common.config.AbstractConfig; | ||
import org.apache.kafka.common.config.ConfigDef; | ||
|
||
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; | ||
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies; | ||
|
||
import org.codehaus.plexus.util.StringUtils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @muralibasani I know you didn't add it here but can you update this import to use
'import org.apache.commons.lang3.StringUtils;' instead of plexus which we are pulling in from the kafka library (and later versions dont include)
"Based on tasks.max config and this strategy, objects are processed in distributed" | ||
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", " | ||
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH, | ||
GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding SourcePollingConfigCounter++ here means we should remove the //NOPMD from line 66 also can you add the comment // Unused Assignment as an explanation for this here please? (removing it from line 67)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was an incorrect usage of counter. Updating it to use offsetStorageGroupCounter.
@@ -92,6 +105,10 @@ public String getErrorsTolerance() { | |||
return cfg.getString(ERRORS_TOLERANCE); | |||
} | |||
|
|||
public String getObjectDistributionStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public String getObjectDistributionStrategy() { | |
public ObjectDistributionStrategies getObjectDistributionStrategy() { | |
return ObjectDistributionStrategies.forName(sourceConfigFragment.getObjectDistributionStrategy()); |
The same is being done for ErrorsTolerance in another PR so we should keep this consistent.
Also I am not sure should it be called ObjectDistributionStrategies or ObjectDistributionStrategy as the enum will return one strategy ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought so, but ObjectDistributionStrategy already exists as an interface in commons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this to return ObjectDistributionStrategy instead of String?
|
||
@Override | ||
public Pattern getFilePattern() { | ||
return filePattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the HashObjectDistributionStrategy does not use the filePattern I think getFilePattern here should throw a NotImplementedExcepetion() as it is unexpected that it ever gets called here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even for object hash strategy, pattern is used in iterator to extract topic and partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extraction of topic and partition should not depend on the distribution strategy going forward.
They are separate concerns and should be implemented as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Pattern configuring is moved to source task.
|
||
private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) { | ||
this.maxTasks = maxTasks; | ||
this.filePattern = configurePattern(expectedSourceNameFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filePattern should not need to be set as it is unused here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pattern is required in this hash object strategy too, pls check source iterator class.
* Based on the format of the file name or prefix, Pattern is created for each of the strategies. | ||
*/ | ||
default Pattern configurePattern(final String expectedSourceNameFormat) { | ||
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this configure Pattern it is really strategy implementation and shouldn't be added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pattern is must for all the strategies on source connectors. Based on this,
- extract topic and partition in source iterator
- task assignments
are done.
If pattern is not done here, we would have to duplicate the whole piece of code again to create pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, moved.
private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class); | ||
|
||
private String prefix; | ||
private String s3Prefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is commons code so we shouldn't have any reference specifically to S3 here.
...main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java
Show resolved
Hide resolved
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be just passing in the Strategy as a predicate here, and if it matches that predicate then, in the future as we add features, predicates can be chained making it really easy to extend this quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DistributionStrategy has a getTaskFor(String)
method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want a new method in AWSV2SourceClient
called addPredicate(Predicate<S3Object> newPredicate)
that will do
this.filterPredicate = this.filterPredicate.and(newPredicate)
@@ -97,13 +97,14 @@ public void start(final Map<String, String> props) { | |||
this.transformer = TransformerFactory.getTransformer(s3SourceConfig); | |||
offsetManager = new OffsetManager(context, s3SourceConfig); | |||
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); | |||
awsv2SourceClient.initializeObjectDistributionStrategy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should initialize the ObjctDistributionStrategy here in S3SourceTask and add it as a predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially started with it, but stuck with some issue and couldn't back to this. thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks like a good start. Getting it settled into the streaming strategy will take a bit of work.
@@ -92,6 +105,10 @@ public String getErrorsTolerance() { | |||
return cfg.getString(ERRORS_TOLERANCE); | |||
} | |||
|
|||
public String getObjectDistributionStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.
|
||
@Override | ||
public Pattern getFilePattern() { | ||
return filePattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extraction of topic and partition should not depend on the distribution strategy going forward.
They are separate concerns and should be implemented as such.
String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; | ||
String START_OFFSET_PATTERN = "{{start_offset}}"; | ||
String TIMESTAMP_PATTERN = "{{timestamp}}"; | ||
String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The files based distribution strategies will need to use these as will any file base topic and partition based extraction. So how about an interface or static class that contains the file based patterns. Call it something like FileExctractionPatterns
* Based on the format of the file name or prefix, Pattern is created for each of the strategies. | ||
*/ | ||
default Pattern configurePattern(final String expectedSourceNameFormat) { | ||
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.
da924e3
to
f342fb9
Compare
commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
6e9e77b
to
6157773
Compare
/** | ||
* @param s3SourceConfig | ||
* configuration for Source connector | ||
* @param failedObjectKeys | ||
* all objectKeys which have already been tried but have been unable to process. | ||
*/ | ||
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) { | ||
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys, | ||
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the AWS client should be agnostic to how the task assignment works its only real focus should be on talking to AWS, so we should not set the filePattern/taskId/distributionStrategy here.
We should call setFilterPredicate either from the S3SourceTask or the SourceRecordIterator.
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); | ||
return taskAssignment == taskId; | ||
public void setFilterPredicate(final Predicate<S3Object> basePredicate) { | ||
this.filterPredicate = basePredicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need this update to look something like the below and this way we just and additional predicates as we need them.
this.filterPredicate = basePredicate | |
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |
this.filterPredicate = this.filterPredicate.and(predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't account for or predicates but we dont have a use case for that yet so I think it should be ok for the moment.
public ObjectDistributionStrategy getObjectDistributionStrategy() { | ||
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add in getTaskId and getMaxTasksId in CommonConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.
this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; | ||
DistributionStrategy distributionStrategy; | ||
|
||
switch (objectDistributionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually need two separate regex's here.
One for the SourceRecordIterator and one for the Distribution Strategy.
The reason being that the fileNameFragment set in the Partition in filepath for example has an any filename pattern at the end, but users will still want to be able to set the filename template separately to only include certain files in for example they may want to put *.png and only send images or they want files with a certain pattern.
The other thing I can't 100% tell here is the difference between
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()
and
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()
is one of them the default pattern we had previously? Ignore if not but if it is we should always look to get the custom one that has been configured and fall back on the default if not configured (also throw an error if its missing partition etc when we configure PARTITION_IN_FILENAME)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no difference between original template and tostr. Just checked. Added integration tests with default value and non default.
For prefix, introduced a new config for pattern.
This should allow users to provide any patterns on file names or prefixes as long as topic
, partition
are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for checking that for me
e23306c
to
bb8c4bd
Compare
@@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) { | |||
// UnusedAssignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: to remove those now we have the one below.
if (currentObjectKey != null) { | ||
if (validateTaskDistributionStrategy(currentObjectKey)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the new changes this might not be needed anymore as the predicate checks inside of the AWSV2Client as it builds the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your earlier point was correct to keep task distribution in source task/iterator. as it is not relevant in Aws client. Keeping as is.
We shall also delete the failed objects from aws client sometime, and keeping only listing objects in aws client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agree as long as the predicate is created outside of the client and then we pass it in with the set predicate we should be good.
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
*/ | ||
|
||
package io.aiven.kafka.connect.common.source.input.utils; | ||
public class FileExtractionPatterns { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a final class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that DistributionStrategy should be an abstract class that has one instance variable maxTasks
It should have 3 methods:
getMaxTasks()
getTaskFor(String)
configure(int maxTasks)
If we don't put the current task into the class then we can share the class across all instances of task.
We can construct the DistributionStrategy instance in the AivenKafkaCOnnectS3CSourceConnector
.
Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor.
In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task.
It might make sens to pass a CommonConfig
in the DistributionStrategy constructor so that any implementation can have access to the known set of configuration properties.
It may also make sens for configure
to accept the CommonConfig
rather than the int maxTasks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I shall refactor these in next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.
Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DistributionStrategy has a getTaskFor(String)
method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want a new method in AWSV2SourceClient
called addPredicate(Predicate<S3Object> newPredicate)
that will do
this.filterPredicate = this.filterPredicate.and(newPredicate)
3c96864
to
6c437ae
Compare
public void reconfigureDistributionStrategy(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} | ||
|
||
public void setMaxTasks(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} | ||
|
||
private void configureDistributionStrategy(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 methods do exactly the same thing. This is an indication that there should be one method to configure the distribution strategy after construction and that it should take a maxTasks argument. Change reconfigureDistributionStrategy
to configureDistributionStrategy
in the base class. and just call that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.
Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition in FileName and Partition in Path do exactly the same thing but extract the partition from different places. What we need is a class that will take the file name, extract all the components mentioned in the pattern from it and set them. Then we have one place to do the extraction for all processes that depend upon the values and you only need on PartitionDistributionStrategy. The PartitionDistributionStrategy should not care where the partition comes from it should just use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See notes elsewhere on partition strategy for background.
This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process.
It should probably have methods to return the values (e.g. getTopic()
) that return Optional types. returning Optional.empty() when the type name is not found in the pattern.
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); | ||
return taskAssignment == taskId; | ||
public void setFilterPredicate(final Predicate<S3Object> basePredicate) { | ||
this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect. I think this will add the same predicates multiple times.
This should literally read:
public void addFilterPredicate(final Predicate<S3Object> otherPredicate) {
this.filterPredicate = this.filterPredicate.and(otherPredicate);
}
|
||
// call filters out bad file names and extracts topic/partition | ||
inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectIterator(null), | ||
s3Object -> this.fileNamePredicate.test(s3Object)); | ||
s3Object -> this.fileNamePredicate.test(s3Object.key())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file name predicate should be an S3Object predicate (as it was before) and should be moved into the S3ObjectIterator by using the AWSV2SourceClient.addPredicate()
A properly configured chain of predicates will allow us to extract information before the S3Object are returned and the IteratorUitls.filteredIterator
call that I put here can be removed.
// return false; | ||
// } | ||
|
||
public Predicate<String> predicateForFileAndTaskAssignment() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't mix predicates. If we keep them simple then we can put new ones into the chain. If we needed to do something between approving the file as part of what we are interested in and assigning a task we will not be able to do it if the 2 tasks are combined.
if (!distributionStrategy.isPartOfTask(taskId, objectKey, filePattern)) { | ||
return false; | ||
} | ||
|
||
final Matcher fileMatcher = filePattern.matcher(objectKey); | ||
|
||
if (fileMatcher.find()) { | ||
// TODO: Decouple topic and partition extraction from S3 specifics | ||
topic = fileMatcher.group(PATTERN_TOPIC_KEY); | ||
partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); | ||
return true; | ||
} | ||
return false; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the order of these checks were reversed then we could exclude any object that does not match the file filter while extracting the file filter provided information and then use that information in the distribution strategy check.
[KCON-63]