Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #308 from Aiven-Open/jeqo/rm-key-record-validation
Browse files Browse the repository at this point in the history
refactor: remove key/record validation
  • Loading branch information
tvainika authored Oct 11, 2023
2 parents 294ea92 + d47938a commit 3aacf07
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
11 changes: 0 additions & 11 deletions src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.templating.Template;

import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.NoCredentials;
Expand Down Expand Up @@ -322,15 +320,6 @@ private void validate() {
GCS_CREDENTIALS_PATH_CONFIG, GCS_CREDENTIALS_JSON_CONFIG);
throw new ConfigException(msg);
}

// Special checks for {{key}} filename template.
final Template filenameTemplate = getFilenameTemplate();
if (RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate))
&& (getMaxRecordsPerFile() > 1)) { // NOPMD avoid literal
final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG,
filenameTemplate, FILE_MAX_RECORDS);
throw new ConfigException(msg);
}
}

public OAuth2Credentials getCredentials() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,15 @@ void wrongFormatTypeConfig() {
assertEquals(expectedErrorMessage, throwable.getMessage());
}

@ParameterizedTest
@ValueSource(strings = { "{{key}}", "{{topic}}/{{partition}}/{{key}}" })
void notSupportedFileMaxRecords(final String fileNameTemplate) {
final Map<String, String> properties = Map.of(GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG, fileNameTemplate,
GcsSinkConfig.FILE_MAX_RECORDS, "2", GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "any_bucket");
assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties), String.format(
"When file.name.template is %s, file.max.records must be either 1 or not set", fileNameTemplate));
}

private void assertConfigDefValidationPasses(final Map<String, String> properties) {
for (final ConfigValue configValue : GcsSinkConfig.configDef().validate(properties)) {
assertTrue(configValue.errorMessages().isEmpty());
Expand Down

0 comments on commit 3aacf07

Please sign in to comment.