Skip to content

Commit

Permalink
integrate object dist strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 8, 2025
1 parent 72c601e commit 6e9e77b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ public static ConfigDef update(final ConfigDef configDef) {
ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(),
new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH,
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
// UnusedAssignment

return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,22 @@ private DistributionStrategy initializeObjectDistributionStrategy() {
this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
DistributionStrategy distributionStrategy;

if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) {
this.filePattern = FilePatternUtils
.configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate());
distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks);
} else if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILEPATH) {
this.filePattern = FilePatternUtils
.configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN);
distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks);
} else {
this.filePattern = FilePatternUtils
.configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString());
distributionStrategy = new HashDistributionStrategy(maxTasks);
switch (objectDistributionStrategy) {
case PARTITION_IN_FILENAME :
this.filePattern = FilePatternUtils.configurePattern(
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate());
distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks);
break;
case PARTITION_IN_FILEPATH :
this.filePattern = FilePatternUtils
.configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN);
distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks);
break;
default :
this.filePattern = FilePatternUtils
.configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString());
distributionStrategy = new HashDistributionStrategy(maxTasks);
break;
}

return distributionStrategy;
Expand Down

0 comments on commit 6e9e77b

Please sign in to comment.