Skip to content

Commit

Permalink
Integrate object dist strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 9, 2025
1 parent bb8c4bd commit 3c96864
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public final class FileNameFragment extends ConfigFragment {
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";

public static final String FILE_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";
public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";

public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
Expand Down Expand Up @@ -115,15 +115,15 @@ public void ensureValid(final String name, final Object value) {
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PREFIX_TEMPLATE,
configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PREFIX_TEMPLATE_CONFIG);
ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

return configDef;
}
Expand Down Expand Up @@ -198,8 +198,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePrefixTemplateConfig() {
return cfg.getString(FILE_PREFIX_TEMPLATE_CONFIG);
public String getFilePathPrefixTemplateConfig() {
return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PREFIX_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
Expand Down Expand Up @@ -403,7 +403,7 @@ private Map<String, String> getConfig(final String connectorName, final String t
config.put(FILE_NAME_TEMPLATE_CONFIG,
"{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}");
if (addPrefix) {
config.put(FILE_PREFIX_TEMPLATE_CONFIG, prefixPattern);
config.put(FILE_PATH_PREFIX_TEMPLATE_CONFIG, prefixPattern);
}
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,9 @@ private DistributionStrategy initializeObjectDistributionStrategy() {
distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks);
break;
case PARTITION_IN_FILEPATH :
this.filePattern = FilePatternUtils.configurePattern(
s3SourceConfig.getS3FileNameFragment().getFilePrefixTemplateConfig() + ANY_FILENAME_PATTERN);
this.filePattern = FilePatternUtils
.configurePattern(s3SourceConfig.getS3FileNameFragment().getFilePathPrefixTemplateConfig()
+ ANY_FILENAME_PATTERN);
distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks);
break;
default :
Expand Down

0 comments on commit 3c96864

Please sign in to comment.