From 6e9e77bcf0a243cf3511ce0d6793424f619cc088 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 8 Jan 2025 12:30:02 +0100 Subject: [PATCH] integrate object dist strategies --- .../common/config/SourceConfigFragment.java | 3 +- .../kafka/connect/s3/source/S3SourceTask.java | 28 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java index f7db65f3..404e4c58 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java @@ -74,13 +74,14 @@ public static ConfigDef update(final ConfigDef configDef) { ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS); configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC, - offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD + offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(), new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM, "Based on tasks.max config and this strategy, objects are processed in distributed" + " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", " + PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH, GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD + // UnusedAssignment return configDef; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index c533b63e..5fff1b34 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -204,18 +204,22 @@ private DistributionStrategy initializeObjectDistributionStrategy() { this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; DistributionStrategy distributionStrategy; - if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()); - distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks); - } else if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILEPATH) { - this.filePattern = FilePatternUtils - .configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN); - distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks); - } else { - this.filePattern = FilePatternUtils - .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); - distributionStrategy = new HashDistributionStrategy(maxTasks); + switch (objectDistributionStrategy) { + case PARTITION_IN_FILENAME : + this.filePattern = FilePatternUtils.configurePattern( + s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()); + distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks); + break; + case PARTITION_IN_FILEPATH : + this.filePattern = FilePatternUtils + .configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN); + distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks); + break; + default : + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + distributionStrategy = new HashDistributionStrategy(maxTasks); + break; } return distributionStrategy;