diff --git a/build.gradle b/build.gradle
index 96974237..5f8bc751 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,9 +19,6 @@ plugins {
// https://docs.gradle.org/current/userguide/java_library_plugin.html
id "java-library"
- // https://docs.gradle.org/current/userguide/checkstyle_plugin.html
- id "checkstyle"
-
// https://docs.gradle.org/current/userguide/jacoco_plugin.html
id "jacoco"
@@ -33,6 +30,9 @@ plugins {
// https://docs.gradle.org/current/userguide/idea_plugin.html
id 'idea'
+
+ // https://plugins.gradle.org/plugin/com.diffplug.spotless
+ id "com.diffplug.spotless" version "6.23.0"
}
wrapper {
@@ -49,6 +49,30 @@ repositories {
mavenCentral()
}
+spotless {
+ format 'misc', {
+ // define the files to apply `misc` to
+ target '*.gradle', '*.md', '.gitignore'
+ targetExclude ".*/**", "**/build/**", "**/.gradle/**"
+
+ // define the steps to apply to those files
+ trimTrailingWhitespace()
+ indentWithSpaces()
+ endWithNewline()
+ }
+
+ java {
+ licenseHeaderFile file("${rootDir}/gradle-config/java.header")
+ importOrder("javax", "java", "org.apache.kafka","org.opensearch","io.aiven","")
+ removeUnusedImports()
+ replaceRegex("No wildcard imports.", "import(?:\\s+static)?\\s+[^\\*\\s]+\\*;(\r\n|\r|\n)", '$1')
+ eclipse().configFile("${rootDir}/gradle-config/aiven-eclipse-formatter.xml")
+ indentWithSpaces()
+ endWithNewline()
+ trimTrailingWhitespace()
+ }
+}
+
java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
@@ -58,11 +82,6 @@ compileJava {
options.compilerArgs = ['-Xlint:all']
}
-checkstyle {
- toolVersion "9.0.1"
- configDirectory.set(rootProject.file("checkstyle/"))
-}
-
jacoco {
toolVersion = "0.8.9"
}
@@ -72,12 +91,6 @@ distributions {
contents {
from jar
from configurations.runtimeClasspath
-
- into("/") {
- from projectDir
- include "version.txt", "README*", "LICENSE*", "NOTICE*", "licenses/"
- include "config/"
- }
}
}
}
@@ -204,7 +217,7 @@ task integrationTest(type: Test) {
// Pass the distribution file path to the tests.
systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
- systemProperty("opensearch.testcontainers.image-version",
+ systemProperty("opensearch.testcontainers.image-version",
project.findProperty("opensearch.testcontainers.image-version") ? project.getProperty("opensearch.testcontainers.image-version") : "2.0.0")
}
diff --git a/gradle-config/aiven-eclipse-formatter.xml b/gradle-config/aiven-eclipse-formatter.xml
new file mode 100644
index 00000000..4f56c341
--- /dev/null
+++ b/gradle-config/aiven-eclipse-formatter.xml
@@ -0,0 +1,172 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/gradle-config/java.header b/gradle-config/java.header
new file mode 100644
index 00000000..a67c3a29
--- /dev/null
+++ b/gradle-config/java.header
@@ -0,0 +1,15 @@
+/*
+ * Copyright $YEAR Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
\ No newline at end of file
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java
index c0978385..d2c95b30 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
@@ -36,10 +38,6 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
-
@Testcontainers
public abstract class AbstractIT {
@@ -55,11 +53,8 @@ void setup() throws Exception {
}
protected Map getDefaultProperties() {
- return Map.of(
- CONNECTION_URL_CONFIG, opensearchContainer.getHttpHostAddress(),
- CONNECTION_USERNAME_CONFIG, "admin",
- CONNECTION_PASSWORD_CONFIG, "admin"
- );
+ return Map.of(CONNECTION_URL_CONFIG, opensearchContainer.getHttpHostAddress(), CONNECTION_USERNAME_CONFIG,
+ "admin", CONNECTION_PASSWORD_CONFIG, "admin");
}
@AfterEach
@@ -70,23 +65,20 @@ void tearDown() throws Exception {
}
protected SearchHits search(final String indexName) throws IOException {
- return opensearchClient.client
- .search(new SearchRequest(indexName), RequestOptions.DEFAULT).getHits();
+ return opensearchClient.client.search(new SearchRequest(indexName), RequestOptions.DEFAULT).getHits();
}
protected void waitForRecords(final String indexName, final int expectedRecords) throws InterruptedException {
- TestUtils.waitForCondition(
- () -> {
- try {
- return expectedRecords == opensearchClient.client
- .count(new CountRequest(indexName), RequestOptions.DEFAULT).getCount();
- } catch (final IOException e) {
- throw new UncheckedIOException(e);
- }
- },
- TimeUnit.MINUTES.toMillis(1L),
- String.format("Could not find expected documents (%d) in time.", expectedRecords)
- );
+ TestUtils.waitForCondition(() -> {
+ try {
+ return expectedRecords == opensearchClient.client
+ .count(new CountRequest(indexName), RequestOptions.DEFAULT)
+ .getCount();
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }, TimeUnit.MINUTES.toMillis(1L),
+ String.format("Could not find expected documents (%d) in time.", expectedRecords));
}
private static String getOpenSearchImage() {
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractKafkaConnectIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractKafkaConnectIT.java
index ced550a8..4a4f8d22 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractKafkaConnectIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractKafkaConnectIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG;
+import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,17 +43,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG;
-import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
-import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
-import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
-import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
-import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
-
-
public class AbstractKafkaConnectIT extends AbstractIT {
static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaConnectIT.class);
@@ -65,9 +62,7 @@ protected AbstractKafkaConnectIT(final String topicName, final String connectorN
@BeforeEach
void startConnect() {
- connect = new EmbeddedConnectCluster.Builder()
- .name("elasticsearch-it-connect-cluster")
- .build();
+ connect = new EmbeddedConnectCluster.Builder().name("elasticsearch-it-connect-cluster").build();
connect.start();
connect.kafka().createTopic(topicName);
}
@@ -84,19 +79,15 @@ void stopConnect() {
}
long waitForConnectorToStart(final String name, final int numTasks) throws InterruptedException {
- TestUtils.waitForCondition(
- () -> assertConnectorAndTasksRunning(name, numTasks).orElse(false),
- CONNECTOR_STARTUP_DURATION_MS,
- "Connector tasks did not start in time."
- );
+ TestUtils.waitForCondition(() -> assertConnectorAndTasksRunning(name, numTasks).orElse(false),
+ CONNECTOR_STARTUP_DURATION_MS, "Connector tasks did not start in time.");
return System.currentTimeMillis();
}
Optional assertConnectorAndTasksRunning(final String connectorName, final int numTasks) {
try {
final var info = connect.connectorStatus(connectorName);
- final boolean result = info != null
- && info.tasks().size() >= numTasks
+ final boolean result = info != null && info.tasks().size() >= numTasks
&& info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
&& info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
return Optional.of(result);
@@ -122,11 +113,7 @@ Map connectorProperties() {
void writeRecords(final int numRecords) {
for (int i = 0; i < numRecords; i++) {
- connect.kafka().produce(
- topicName,
- String.valueOf(i),
- String.format("{\"doc_num\":%d}", i)
- );
+ connect.kafka().produce(topicName, String.valueOf(i), String.format("{\"doc_num\":%d}", i));
}
}
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchClientIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchClientIT.java
index 415f5fc6..d62b08d2 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchClientIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchClientIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@@ -37,14 +40,8 @@
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class OpensearchClientIT extends AbstractIT {
-
@Test
void getsVersion() {
assertEquals(opensearchClient.getVersion(), getOpenSearchVersion());
@@ -81,10 +78,8 @@ void createIndexDoesNotCreateWhenAliasExists() throws Exception {
final OpensearchClient tmpClient = new OpensearchClient(config);
try {
- tmpClient.client.indices().create(
- new CreateIndexRequest("index_6").alias(new Alias("alias_1")),
- RequestOptions.DEFAULT
- );
+ tmpClient.client.indices()
+ .create(new CreateIndexRequest("index_6").alias(new Alias("alias_1")), RequestOptions.DEFAULT);
} catch (final OpenSearchStatusException | IOException e) {
throw e;
}
@@ -98,18 +93,14 @@ void createIndexTemplateAndDataStream() throws Exception {
props.put(OpensearchSinkConnectorConfig.DATA_STREAM_PREFIX, "some_data_stream");
final var config = new OpensearchSinkConnectorConfig(props);
- opensearchClient.createIndexTemplateAndDataStream(
- config.dataStreamPrefix().get(), config.dataStreamTimestampField());
-
- assertTrue(
- opensearchClient.dataStreamIndexTemplateExists(
- String.format(OpensearchClient.DATA_STREAM_TEMPLATE_NAME_PATTERN, "some_data_stream")
- )
- );
- final var dataStreams = opensearchClient.client.indices().getDataStream(
- new GetDataStreamRequest("some_data_stream"),
- RequestOptions.DEFAULT
- ).getDataStreams();
+ opensearchClient.createIndexTemplateAndDataStream(config.dataStreamPrefix().get(),
+ config.dataStreamTimestampField());
+
+ assertTrue(opensearchClient.dataStreamIndexTemplateExists(
+ String.format(OpensearchClient.DATA_STREAM_TEMPLATE_NAME_PATTERN, "some_data_stream")));
+ final var dataStreams = opensearchClient.client.indices()
+ .getDataStream(new GetDataStreamRequest("some_data_stream"), RequestOptions.DEFAULT)
+ .getDataStreams();
assertFalse(dataStreams.isEmpty());
assertEquals(1, dataStreams.size());
assertEquals("some_data_stream", dataStreams.get(0).getName());
@@ -122,22 +113,15 @@ void createIndexDoesNotCreateAlreadyExistingDataStream() throws Exception {
try {
final ComposableIndexTemplate template = new ComposableIndexTemplate(
- Arrays.asList("data_stream_1", "index-logs-*"),
- null,
- null,
- 100L,
- null,
- null,
+ Arrays.asList("data_stream_1", "index-logs-*"), null, null, 100L, null, null,
new DataStreamTemplate());
final PutComposableIndexTemplateRequest request = new PutComposableIndexTemplateRequest();
request.name("data-stream-template");
request.indexTemplate(template);
tmpClient.client.indices().putIndexTemplate(request, RequestOptions.DEFAULT);
- tmpClient.client.indices().createDataStream(
- new CreateDataStreamRequest("data_stream_1"),
- RequestOptions.DEFAULT
- );
+ tmpClient.client.indices()
+ .createDataStream(new CreateDataStreamRequest("data_stream_1"), RequestOptions.DEFAULT);
} catch (final OpenSearchStatusException | IOException e) {
throw e;
}
@@ -149,12 +133,11 @@ void createIndexDoesNotCreateAlreadyExistingDataStream() throws Exception {
void createMapping() throws IOException {
assertTrue(opensearchClient.createIndex("index_4"));
- final var schema =
- SchemaBuilder.struct()
- .name("record")
- .field("name", SchemaBuilder.string().defaultValue("").build())
- .field("value", SchemaBuilder.int32().defaultValue(0).build())
- .build();
+ final var schema = SchemaBuilder.struct()
+ .name("record")
+ .field("name", SchemaBuilder.string().defaultValue("").build())
+ .field("value", SchemaBuilder.int32().defaultValue(0).build())
+ .build();
opensearchClient.createMapping("index_4", schema);
assertTrue(opensearchClient.hasMapping("index_4"));
@@ -162,19 +145,23 @@ void createMapping() throws IOException {
final var response = opensearchClient.client.indices()
.getMapping(new GetMappingsRequest().indices("index_4"), RequestOptions.DEFAULT)
.mappings()
- .get("index_4").getSourceAsMap();
+ .get("index_4")
+ .getSourceAsMap();
assertTrue(response.containsKey("properties"));
- @SuppressWarnings("unchecked") final var properties = (Map) response.get("properties");
+ @SuppressWarnings("unchecked")
+ final var properties = (Map) response.get("properties");
assertTrue(properties.containsKey("name"));
assertTrue(properties.containsKey("value"));
- @SuppressWarnings("unchecked") final var nameProperty = (Map) properties.get("name");
+ @SuppressWarnings("unchecked")
+ final var nameProperty = (Map) properties.get("name");
assertEquals("text", nameProperty.get("type"));
assertNull(nameProperty.get("null_value"));
- @SuppressWarnings("unchecked") final var valueProperty = (Map) properties.get("value");
+ @SuppressWarnings("unchecked")
+ final var valueProperty = (Map) properties.get("value");
assertEquals("integer", valueProperty.get("type"));
assertEquals(0, valueProperty.get("null_value"));
}
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorIT.java
index ba037a0e..2a6f3ba1 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,19 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class OpensearchSinkConnectorIT extends AbstractKafkaConnectIT {
static final Logger LOGGER = LoggerFactory.getLogger(OpensearchSinkConnectorIT.class);
@@ -58,15 +56,10 @@ public void testConnector() throws Exception {
@Test
public void testConnectorConfig() throws Exception {
- assertEquals(
- connect.validateConnectorConfig(
- "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
- Map.of("connector.class", "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
- "topics", "example-topic-name", "name", "test-connector-name")
- ).errorCount(), 1
- );
+ assertEquals(connect.validateConnectorConfig("io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
+ Map.of("connector.class", "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "topics",
+ "example-topic-name", "name", "test-connector-name"))
+ .errorCount(), 1);
}
-
-
}
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkDataStreamConnectorIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkDataStreamConnectorIT.java
index 45fd9ca0..f686a69c 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkDataStreamConnectorIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkDataStreamConnectorIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.concurrent.TimeUnit;
import org.opensearch.client.RequestOptions;
@@ -26,10 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class OpensearchSinkDataStreamConnectorIT extends AbstractKafkaConnectIT {
static final Logger LOGGER = LoggerFactory.getLogger(OpensearchSinkConnectorIT.class);
@@ -44,8 +42,8 @@ public class OpensearchSinkDataStreamConnectorIT extends AbstractKafkaConnectIT
static final String DATA_STREAM_WITH_PREFIX_INDEX_NAME = String.format("%s-%s", DATA_STREAM_PREFIX, TOPIC_NAME);
- static final String DATA_STREAM_PREFIX_WITH_TIMESTAMP_INDEX_NAME =
- String.format("%s-%s", DATA_STREAM_PREFIX_WITH_TIMESTAMP, TOPIC_NAME);
+ static final String DATA_STREAM_PREFIX_WITH_TIMESTAMP_INDEX_NAME = String.format("%s-%s",
+ DATA_STREAM_PREFIX_WITH_TIMESTAMP, TOPIC_NAME);
static final String CONNECTOR_NAME = "os-ds-sink-connector";
public OpensearchSinkDataStreamConnectorIT() {
@@ -79,11 +77,9 @@ void testConnectorWithDataStreamCustomTimestamp() throws Exception {
waitForConnectorToStart(CONNECTOR_NAME, 1);
for (int i = 0; i < 10; i++) {
- connect.kafka().produce(
- topicName,
- String.valueOf(i),
- String.format("{\"doc_num\":%d, \"custom_timestamp\": %s}", i, System.currentTimeMillis())
- );
+ connect.kafka()
+ .produce(topicName, String.valueOf(i),
+ String.format("{\"doc_num\":%d, \"custom_timestamp\": %s}", i, System.currentTimeMillis()));
}
waitForRecords(DATA_STREAM_PREFIX_WITH_TIMESTAMP_INDEX_NAME, 10);
@@ -104,17 +100,13 @@ void testConnectorWithDataStreamPrefix() throws Exception {
waitForRecords(DATA_STREAM_WITH_PREFIX_INDEX_NAME, 10);
assertDataStream(DATA_STREAM_WITH_PREFIX_INDEX_NAME);
- assertDocs(
- DATA_STREAM_WITH_PREFIX_INDEX_NAME,
- OpensearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_FIELD_DEFAULT
- );
+ assertDocs(DATA_STREAM_WITH_PREFIX_INDEX_NAME,
+ OpensearchSinkConnectorConfig.DATA_STREAM_TIMESTAMP_FIELD_DEFAULT);
}
void assertDataStream(final String dataStreamName) throws Exception {
final var dsStats = opensearchClient.client.indices()
- .dataStreamsStats(
- new DataStreamsStatsRequest(dataStreamName), RequestOptions.DEFAULT
- );
+ .dataStreamsStats(new DataStreamsStatsRequest(dataStreamName), RequestOptions.DEFAULT);
assertEquals(1, dsStats.getDataStreamCount());
assertEquals(1, dsStats.getBackingIndices());
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTaskIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTaskIT.java
index 50cb1770..a4ecfbf4 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTaskIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTaskIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_ID_STRATEGY_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -42,17 +51,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.KEY_IGNORE_ID_STRATEGY_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
public class OpensearchSinkTaskIT extends AbstractIT {
private static final int PARTITION_1 = 12;
@@ -63,11 +61,8 @@ public class OpensearchSinkTaskIT extends AbstractIT {
void tearDown() throws Exception {
if (opensearchClient.indexOrDataStreamExists(TOPIC_NAME)) {
opensearchClient.client.indices().delete(new DeleteIndexRequest(TOPIC_NAME), RequestOptions.DEFAULT);
- TestUtils.waitForCondition(
- () -> !opensearchClient.indexOrDataStreamExists(TOPIC_NAME),
- TimeUnit.MINUTES.toMillis(1),
- "Index has not been deleted yet."
- );
+ TestUtils.waitForCondition(() -> !opensearchClient.indexOrDataStreamExists(TOPIC_NAME),
+ TimeUnit.MINUTES.toMillis(1), "Index has not been deleted yet.");
}
}
@@ -79,24 +74,17 @@ public void testBytes() throws Exception {
.build();
final Struct struct = new Struct(structSchema);
- struct.put("bytes", new byte[]{42});
-
- runTask(
- getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT),
- List.of(
- new SinkRecord(
- TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA,
- "key", structSchema, struct, 0)
- )
- );
+ struct.put("bytes", new byte[] { 42 });
+
+ runTask(getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", structSchema, struct, 0)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 1);
for (final var hint : search(TOPIC_NAME)) {
if (hint.getId().equals("key")) {
- assertEquals(Base64.getEncoder().encodeToString(new byte[]{42}), hint.getSourceAsMap().get("bytes"));
+ assertEquals(Base64.getEncoder().encodeToString(new byte[] { 42 }), hint.getSourceAsMap().get("bytes"));
}
}
}
@@ -107,17 +95,15 @@ public void testDecimal() throws Exception {
final byte[] bytes = ByteBuffer.allocate(4).putInt(2).array();
final BigDecimal decimal = new BigDecimal(new BigInteger(bytes), scale);
- final Schema structSchema = SchemaBuilder.struct().name("struct")
+ final Schema structSchema = SchemaBuilder.struct()
+ .name("struct")
.field("decimal", Decimal.schema(scale))
.build();
final Struct struct = new Struct(structSchema);
struct.put("decimal", decimal);
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
- List.of(new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", structSchema, struct, 0))
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", structSchema, struct, 0)));
for (final var hint : search(TOPIC_NAME)) {
if (hint.getId().equals("key")) {
assertEquals(0.02d, hint.getSourceAsMap().get("decimal"));
@@ -139,24 +125,14 @@ final var record = createRecord(schema);
opensearchSinkTask.initialize(mockContext);
opensearchSinkTask.start(getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT));
opensearchSinkTask.put(
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", schema, record, 0),
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", schema, record, 1)
- )
- );
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", schema, record, 0),
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", schema, record, 1)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
opensearchSinkTask.flush(null);
waitForRecords(TOPIC_NAME, 2);
- opensearchSinkTask.put(
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA,
- "key", otherSchema, otherRecord, 2),
- new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA,
- "key", otherSchema, otherRecord, 3)
- )
- );
+ opensearchSinkTask.put(List.of(
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", otherSchema, otherRecord, 2),
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", otherSchema, otherRecord, 3)));
opensearchSinkTask.flush(null);
waitForRecords(TOPIC_NAME, 4);
} finally {
@@ -170,15 +146,11 @@ public void testIncompatible() throws Exception {
final var record = createRecord(schema);
final var otherSchema = createOtherSchema();
final var otherRecord = createOtherRecord(otherSchema);
- assertThrows(ConnectException.class, () -> runTask(
- getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT),
- List.of(new SinkRecord(
- TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", otherSchema, otherRecord, 0),
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", schema, record, 1)
- )
- ));
+ assertThrows(ConnectException.class,
+ () -> runTask(getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT), List.of(
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", otherSchema, otherRecord,
+ 0),
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", schema, record, 1))));
}
@Test
@@ -190,69 +162,44 @@ public void testDeleteOnNullValue() throws Exception {
final var record = createRecord(schema);
// First, write a couple of actual (non-null-valued) records
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, key1, schema, record, 0),
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, key2, schema, record, 1)
- )
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, key1, schema, record, 0),
+ new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, key2, schema, record, 1)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 2);
// Then, write a record with the same key as the first inserted record but a null value
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, key1, schema, null, 2)
- )
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, key1, schema, null, 2)));
waitForRecords(TOPIC_NAME, 1);
}
@Test
public void testDeleteWithNullKey() throws Exception {
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, null, createSchema(), null, 0)
- )
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DELETE),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, null, createSchema(), null, 0)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 0);
}
@Test
public void testFailOnNullValue() throws Exception {
- assertThrows(
- ConnectException.class,
- () -> runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.FAIL),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", createSchema(), null, 0)
- )
- )
- );
+ assertThrows(ConnectException.class,
+ () -> runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.FAIL),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", createSchema(),
+ null, 0))));
}
@Test
public void testIgnoreNullValue() throws Exception {
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.IGNORE),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", createSchema(), null, 0)
- )
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.IGNORE),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", createSchema(), null, 0)));
assertFalse(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
}
@Test
public void testMap() throws Exception {
- final var structSchema = SchemaBuilder.struct().name("struct")
+ final var structSchema = SchemaBuilder.struct()
+ .name("struct")
.field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
.build();
@@ -263,13 +210,8 @@ public void testMap() throws Exception {
final Struct struct = new Struct(structSchema);
struct.put("map", map);
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
- List.of(
- new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", structSchema, struct, 0)
- )
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", structSchema, struct, 0)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 1);
}
@@ -282,21 +224,15 @@ public void testStringKeyedMap() throws Exception {
map.put("One", 1);
map.put("Two", 2);
- runTask(
- getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
- List.of(new SinkRecord(TOPIC_NAME, PARTITION_1,
- Schema.STRING_SCHEMA, "key", mapSchema, map, 0))
- );
+ runTask(getDefaultTaskProperties(false, RecordConverter.BehaviorOnNullValues.DEFAULT),
+ List.of(new SinkRecord(TOPIC_NAME, PARTITION_1, Schema.STRING_SCHEMA, "key", mapSchema, map, 0)));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 1);
}
@Test
public void testWriterIgnoreKey() throws Exception {
- runTask(
- getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT),
- prepareData(2)
- );
+ runTask(getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT), prepareData(2));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 2);
}
@@ -305,10 +241,7 @@ public void testWriterIgnoreKey() throws Exception {
public void testWriterIgnoreSchema() throws Exception {
final var props = getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT);
props.put(SCHEMA_IGNORE_CONFIG, "true");
- runTask(
- getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT),
- prepareData(2)
- );
+ runTask(getDefaultTaskProperties(true, RecordConverter.BehaviorOnNullValues.DEFAULT), prepareData(2));
assertTrue(opensearchClient.indexOrDataStreamExists(TOPIC_NAME));
waitForRecords(TOPIC_NAME, 2);
}
@@ -334,7 +267,7 @@ final var record = createRecord(schema);
}
Map getDefaultTaskProperties(final boolean ignoreKey,
- final RecordConverter.BehaviorOnNullValues behaviorOnNullValues) {
+ final RecordConverter.BehaviorOnNullValues behaviorOnNullValues) {
final var props = new HashMap<>(getDefaultProperties());
props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, behaviorOnNullValues.name());
props.put(DROP_INVALID_MESSAGE_CONFIG, "false");
@@ -343,22 +276,19 @@ Map getDefaultTaskProperties(final boolean ignoreKey,
}
protected Struct createRecord(final Schema schema) {
- return new Struct(schema)
- .put("user", "John Doe")
- .put("message", "blah-blah-blah-blah");
+ return new Struct(schema).put("user", "John Doe").put("message", "blah-blah-blah-blah");
}
protected Schema createSchema() {
- return SchemaBuilder.struct().name("record")
+ return SchemaBuilder.struct()
+ .name("record")
.field("user", Schema.STRING_SCHEMA)
.field("message", Schema.STRING_SCHEMA)
.build();
}
protected Schema createOtherSchema() {
- return SchemaBuilder.struct().name("record")
- .field("user", Schema.INT32_SCHEMA)
- .build();
+ return SchemaBuilder.struct().name("record").field("user", Schema.INT32_SCHEMA).build();
}
protected Struct createOtherRecord(final Schema schema) {
diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkUpsertConnectorIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkUpsertConnectorIT.java
index a9d5e0c3..dbbf2e7a 100644
--- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkUpsertConnectorIT.java
+++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/OpensearchSinkUpsertConnectorIT.java
@@ -1,6 +1,5 @@
/*
* Copyright 2021 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -28,10 +30,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertIterableEquals;
-
public class OpensearchSinkUpsertConnectorIT extends AbstractKafkaConnectIT {
final ObjectMapper objectMapper = new ObjectMapper();
@@ -47,10 +45,8 @@ public OpensearchSinkUpsertConnectorIT() {
@Test
public void testConnector() throws Exception {
final var props = connectorProperties();
- props.put(
- OpensearchSinkConnectorConfig.INDEX_WRITE_METHOD,
- IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT)
- );
+ props.put(OpensearchSinkConnectorConfig.INDEX_WRITE_METHOD,
+ IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT));
props.put(OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "false");
connect.configureConnector(CONNECTOR_NAME, props);
waitForConnectorToStart(CONNECTOR_NAME, 1);
@@ -68,23 +64,11 @@ public void testConnector() throws Exception {
for (var i = 0; i < messages.size(); i++) {
final var m = messages.get(i);
m.getRight().put("another_key", "another_value_" + i);
- connect.kafka().produce(
- TOPIC_NAME,
- m.getLeft(),
- objectMapper.writeValueAsString(m.getRight())
- );
+ connect.kafka().produce(TOPIC_NAME, m.getLeft(), objectMapper.writeValueAsString(m.getRight()));
}
- connect.kafka().produce(
- TOPIC_NAME,
- String.valueOf(11),
- String.format("{\"doc_num\":%d}", 11)
- );
- connect.kafka().produce(
- TOPIC_NAME,
- String.valueOf(12),
- String.format("{\"doc_num\":%d}", 12)
- );
+ connect.kafka().produce(TOPIC_NAME, String.valueOf(11), String.format("{\"doc_num\":%d}", 11));
+ connect.kafka().produce(TOPIC_NAME, String.valueOf(12), String.format("{\"doc_num\":%d}", 12));
waitForRecords(TOPIC_NAME, 5);
@@ -95,10 +79,8 @@ public void testConnector() throws Exception {
foundDocs.put(id, hit.getSourceAsMap());
}
- assertIterableEquals(
- List.of(0, 1, 2, 11, 12),
- foundDocs.keySet().stream().sorted().collect(Collectors.toList())
- );
+ assertIterableEquals(List.of(0, 1, 2, 11, 12),
+ foundDocs.keySet().stream().sorted().collect(Collectors.toList()));
for (var i = 0; i < 3; i++) {
assertEquals(i, foundDocs.get(i).get("doc_num"));
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/BulkProcessor.java b/src/main/java/io/aiven/kafka/connect/opensearch/BulkProcessor.java
index 483236f5..63c006e4 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/BulkProcessor.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/BulkProcessor.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -51,8 +51,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry;
-
public class BulkProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkProcessor.class);
@@ -85,16 +83,13 @@ public class BulkProcessor {
private final ErrantRecordReporter reporter;
- public BulkProcessor(final Time time,
- final RestHighLevelClient client,
- final OpensearchSinkConnectorConfig config) {
+ public BulkProcessor(final Time time, final RestHighLevelClient client,
+ final OpensearchSinkConnectorConfig config) {
this(time, client, config, null);
}
- public BulkProcessor(final Time time,
- final RestHighLevelClient client,
- final OpensearchSinkConnectorConfig config,
- final ErrantRecordReporter reporter) {
+ public BulkProcessor(final Time time, final RestHighLevelClient client, final OpensearchSinkConnectorConfig config,
+ final ErrantRecordReporter reporter) {
this.time = time;
this.client = client;
@@ -114,24 +109,22 @@ public BulkProcessor(final Time time,
executor = Executors.newFixedThreadPool(config.maxInFlightRequests(), threadFactory);
if (!config.ignoreKey() && config.behaviorOnVersionConflict() == BehaviorOnVersionConflict.FAIL) {
- LOGGER.warn("The {} is set to `false` which assumes external version and optimistic locking."
- + " You may consider changing the configuration property '{}' from '{}' to '{}' or '{}'"
- + " to deal with possible version conflicts.",
- OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG,
- OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
- BehaviorOnMalformedDoc.FAIL,
- BehaviorOnMalformedDoc.IGNORE,
- BehaviorOnMalformedDoc.WARN);
+ LOGGER.warn(
+ "The {} is set to `false` which assumes external version and optimistic locking."
+ + " You may consider changing the configuration property '{}' from '{}' to '{}' or '{}'"
+ + " to deal with possible version conflicts.",
+ OpensearchSinkConnectorConfig.KEY_IGNORE_CONFIG,
+ OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG, BehaviorOnMalformedDoc.FAIL,
+ BehaviorOnMalformedDoc.IGNORE, BehaviorOnMalformedDoc.WARN);
}
}
private ThreadFactory makeThreadFactory() {
final AtomicInteger threadCounter = new AtomicInteger();
- final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
- (t, e) -> {
- LOGGER.error("Uncaught exception in BulkProcessor thread {}", t, e);
- failAndStop(e);
- };
+ final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {
+ LOGGER.error("Uncaught exception in BulkProcessor thread {}", t, e);
+ failAndStop(e);
+ };
return new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
@@ -161,9 +154,8 @@ private Runnable farmerTask() {
// Visible for testing
synchronized Future submitBatchWhenReady() throws InterruptedException {
- for (long waitStartTimeMs = time.milliseconds(), elapsedMs = 0;
- !stopRequested && !canSubmit(elapsedMs);
- elapsedMs = time.milliseconds() - waitStartTimeMs) {
+ for (long waitStartTimeMs = time.milliseconds(), elapsedMs = 0; !stopRequested
+ && !canSubmit(elapsedMs); elapsedMs = time.milliseconds() - waitStartTimeMs) {
// when linger time has already elapsed, we still have to ensure the other submission
// conditions hence the wait(0) in that case
wait(Math.max(0, lingerMs - elapsedMs));
@@ -206,8 +198,9 @@ public void start() {
/**
* Initiate shutdown.
*
- *
Pending buffered records are not automatically flushed, so call {@link #flush(long)} before
- * this method if this is desirable.
+ *
+ * Pending buffered records are not automatically flushed, so call {@link #flush(long)} before this method if this
+ * is desirable.
*/
public void stop() {
LOGGER.trace("stop");
@@ -222,7 +215,8 @@ public void stop() {
/**
* Block upto {@code timeoutMs} till shutdown is complete.
*
- *
This should only be called after a previous {@link #stop()} invocation.
+ *
+ * This should only be called after a previous {@link #stop()} invocation.
*/
public void awaitStop(final long timeoutMs) {
LOGGER.trace("awaitStop {}", timeoutMs);
@@ -286,21 +280,20 @@ public void throwIfTerminal() {
}
/**
- * Add a record, may block upto {@code timeoutMs} if at capacity with respect to
- * {@code maxBufferedRecords}.
+ * Add a record, may block upto {@code timeoutMs} if at capacity with respect to {@code maxBufferedRecords}.
*
- *
If any task has failed prior to or while blocked in the add, or if the timeout expires
- * while blocked, {@link ConnectException} will be thrown.
+ *
+ * If any task has failed prior to or while blocked in the add, or if the timeout expires while blocked,
+ * {@link ConnectException} will be thrown.
*/
public synchronized void add(final DocWriteRequest> docWriteRequests, final SinkRecord sinkRecord,
- final long timeoutMs) {
+ final long timeoutMs) {
throwIfTerminal();
if (bufferedRecords() >= maxBufferedRecords) {
final long addStartTimeMs = time.milliseconds();
- for (long elapsedMs = time.milliseconds() - addStartTimeMs;
- !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords;
- elapsedMs = time.milliseconds() - addStartTimeMs) {
+ for (long elapsedMs = time.milliseconds() - addStartTimeMs; !isTerminal() && elapsedMs < timeoutMs
+ && bufferedRecords() >= maxBufferedRecords; elapsedMs = time.milliseconds() - addStartTimeMs) {
try {
wait(timeoutMs - elapsedMs);
} catch (final InterruptedException e) {
@@ -320,8 +313,8 @@ public synchronized void add(final DocWriteRequest> docWriteRequests, final Si
/**
* Request a flush and block upto {@code timeoutMs} until all pending records have been flushed.
*
- *
If any task has failed prior to or during the flush, {@link ConnectException} will be
- * thrown with that error.
+ *
+ * If any task has failed prior to or during the flush, {@link ConnectException} will be thrown with that error.
*/
public void flush(final long timeoutMs) {
LOGGER.trace("flush {}", timeoutMs);
@@ -330,15 +323,13 @@ public void flush(final long timeoutMs) {
flushRequested = true;
synchronized (this) {
notifyAll();
- for (long elapsedMs = time.milliseconds() - flushStartTimeMs;
- !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() > 0;
- elapsedMs = time.milliseconds() - flushStartTimeMs) {
+ for (long elapsedMs = time.milliseconds() - flushStartTimeMs; !isTerminal() && elapsedMs < timeoutMs
+ && bufferedRecords() > 0; elapsedMs = time.milliseconds() - flushStartTimeMs) {
wait(timeoutMs - elapsedMs);
}
throwIfTerminal();
if (bufferedRecords() > 0) {
- throw new ConnectException("Flush timeout expired with unflushed records: "
- + bufferedRecords());
+ throw new ConnectException("Flush timeout expired with unflushed records: " + bufferedRecords());
}
}
} catch (final InterruptedException e) {
@@ -397,11 +388,9 @@ public RetriableError(final Throwable cause) {
return callWithRetry("bulk processing", () -> {
try {
- final var response =
- client.bulk(new BulkRequest().add(
- batch.stream().map(DocWriteWrapper::getDocWriteRequest)
- .collect(Collectors.toList())),
- RequestOptions.DEFAULT);
+ final var response = client.bulk(new BulkRequest()
+ .add(batch.stream().map(DocWriteWrapper::getDocWriteRequest).collect(Collectors.toList())),
+ RequestOptions.DEFAULT);
if (!response.hasFailures()) {
// We only logged failures, so log the success immediately after a failure ...
LOGGER.debug("Completed batch {} of {} records", batchId, batch.size());
@@ -415,21 +404,18 @@ public RetriableError(final Throwable cause) {
} else if (responseContainsVersionConflict(itemResponse)) {
handleVersionConflict(itemResponse);
} else {
- throw new RetriableError(
- "One of the item in the bulk response failed. Reason: "
+ throw new RetriableError("One of the item in the bulk response failed. Reason: "
+ itemResponse.getFailureMessage());
}
} else {
- throw new ConnectException(
- "One of the item in the bulk response aborted. Reason: "
+ throw new ConnectException("One of the item in the bulk response aborted. Reason: "
+ itemResponse.getFailureMessage());
}
}
}
return response;
} catch (final IOException e) {
- LOGGER.error(
- "Failed to send bulk request from batch {} of {} records", batchId, batch.size(), e);
+ LOGGER.error("Failed to send bulk request from batch {} of {} records", batchId, batch.size(), e);
throw new RetriableError(e);
}
}, maxRetries, retryBackoffMs, RetriableError.class);
@@ -439,38 +425,39 @@ private void handleVersionConflict(final BulkItemResponse bulkItemResponse) {
// if the elasticsearch request failed because of a version conflict,
// the behavior is configurable.
switch (behaviorOnVersionConflict) {
- case IGNORE:
- LOGGER.debug("Encountered a version conflict when executing batch {} of {}"
+ case IGNORE :
+ LOGGER.debug(
+ "Encountered a version conflict when executing batch {} of {}"
+ " records. Ignoring and will keep an existing record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
- case REPORT:
- final String errorMessage =
- String.format("Encountered a version conflict when executing batch %s of %s"
- + " records. Reporting this error to the errant record reporter and will"
- + " keep an existing record."
- + " Rest status: %s, Action id: %s, Error message: %s",
- batchId, batch.size(), bulkItemResponse.getFailure().getStatus(),
- bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailureMessage());
+ case REPORT :
+ final String errorMessage = String.format(
+ "Encountered a version conflict when executing batch %s of %s"
+ + " records. Reporting this error to the errant record reporter and will"
+ + " keep an existing record."
+ + " Rest status: %s, Action id: %s, Error message: %s",
+ batchId, batch.size(), bulkItemResponse.getFailure().getStatus(),
+ bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailureMessage());
sendToErrantRecordReporter(errorMessage, batch.get(bulkItemResponse.getItemId()).getSinkRecord());
break;
- case WARN:
- LOGGER.warn("Encountered a version conflict when executing batch {} of {}"
+ case WARN :
+ LOGGER.warn(
+ "Encountered a version conflict when executing batch {} of {}"
+ " records. Ignoring and will keep an existing record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
- case FAIL:
- default:
- LOGGER.error("Encountered a version conflict when executing batch {} of {}"
+ case FAIL :
+ default :
+ LOGGER.error(
+ "Encountered a version conflict when executing batch {} of {}"
+ " records. Error was {} (to ignore version conflicts you may consider"
+ " changing the configuration property '{}' from '{}' to '{}').",
batchId, batch.size(), bulkItemResponse.getFailureMessage(),
OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
- BehaviorOnMalformedDoc.FAIL,
- BehaviorOnMalformedDoc.IGNORE);
- throw new ConnectException(
- "One of the item in the bulk response failed. Reason: "
- + bulkItemResponse.getFailureMessage());
+ BehaviorOnMalformedDoc.FAIL, BehaviorOnMalformedDoc.IGNORE);
+ throw new ConnectException("One of the item in the bulk response failed. Reason: "
+ + bulkItemResponse.getFailureMessage());
}
}
@@ -478,35 +465,37 @@ private void handleMalformedDoc(final BulkItemResponse bulkItemResponse) {
// if the elasticsearch request failed because of a malformed document,
// the behavior is configurable.
switch (behaviorOnMalformedDoc) {
- case IGNORE:
- LOGGER.debug("Encountered an illegal document error when executing batch {} of {}"
+ case IGNORE :
+ LOGGER.debug(
+ "Encountered an illegal document error when executing batch {} of {}"
+ " records. Ignoring and will not index record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
- case REPORT:
- final String errorMessage =
- String.format("Encountered a version conflict when executing batch %s of %s"
- + " records. Reporting this error to the errant record reporter"
- + " and will not index record."
- + " Rest status: %s, Action id: %s, Error message: %s",
- batchId, batch.size(), bulkItemResponse.getFailure().getStatus(),
- bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailureMessage());
+ case REPORT :
+ final String errorMessage = String.format(
+ "Encountered a version conflict when executing batch %s of %s"
+ + " records. Reporting this error to the errant record reporter"
+ + " and will not index record."
+ + " Rest status: %s, Action id: %s, Error message: %s",
+ batchId, batch.size(), bulkItemResponse.getFailure().getStatus(),
+ bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailureMessage());
sendToErrantRecordReporter(errorMessage, batch.get(bulkItemResponse.getItemId()).getSinkRecord());
break;
- case WARN:
- LOGGER.warn("Encountered an illegal document error when executing batch {} of {}"
+ case WARN :
+ LOGGER.warn(
+ "Encountered an illegal document error when executing batch {} of {}"
+ " records. Ignoring and will not index record. Error was {}",
batchId, batch.size(), bulkItemResponse.getFailureMessage());
break;
- case FAIL:
- default:
- LOGGER.error("Encountered an illegal document error when executing batch {} of {}"
+ case FAIL :
+ default :
+ LOGGER.error(
+ "Encountered an illegal document error when executing batch {} of {}"
+ " records. Error was {} (to ignore future records like this"
+ " change the configuration property '{}' from '{}' to '{}').",
batchId, batch.size(), bulkItemResponse.getFailureMessage(),
OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
- BehaviorOnMalformedDoc.FAIL,
- BehaviorOnMalformedDoc.IGNORE);
+ BehaviorOnMalformedDoc.FAIL, BehaviorOnMalformedDoc.IGNORE);
throw new ConnectException("Bulk request failed: " + bulkItemResponse.getFailureMessage());
}
}
@@ -531,7 +520,6 @@ public SinkRecord getSinkRecord() {
}
}
-
private boolean responseContainsVersionConflict(final BulkItemResponse bulkItemResponse) {
return bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT
|| bulkItemResponse.getFailureMessage().contains("version_conflict_engine_exception");
@@ -571,10 +559,7 @@ private static ConnectException toConnectException(final Throwable t) {
}
public enum BehaviorOnMalformedDoc {
- IGNORE,
- WARN,
- FAIL,
- REPORT;
+ IGNORE, WARN, FAIL, REPORT;
public static final BehaviorOnMalformedDoc DEFAULT = FAIL;
@@ -622,10 +607,7 @@ public String toString() {
}
public enum BehaviorOnVersionConflict {
- IGNORE,
- WARN,
- FAIL,
- REPORT;
+ IGNORE, WARN, FAIL, REPORT;
public static final BehaviorOnVersionConflict DEFAULT = FAIL;
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java b/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java
index 452d0dee..9affebe6 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/DocumentIDStrategy.java
@@ -1,6 +1,5 @@
/*
* Copyright 2019 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.util.Arrays;
@@ -33,14 +31,10 @@ public enum DocumentIDStrategy {
NONE("none", "No Doc ID is added", record -> null),
- RECORD_KEY("record.key", "Generated from the record's key",
- record -> convertKey(record.keySchema(), record.key())
- ),
+ RECORD_KEY("record.key", "Generated from the record's key", record -> convertKey(record.keySchema(), record.key())),
- TOPIC_PARTITION_OFFSET("topic.partition.offset",
- "Generated as record's ``topic+partition+offset``",
- record -> String.format("%s+%s+%s", record.topic(), record.kafkaPartition(), record.kafkaOffset())
- );
+ TOPIC_PARTITION_OFFSET("topic.partition.offset", "Generated as record's ``topic+partition+offset``",
+ record -> String.format("%s+%s+%s", record.topic(), record.kafkaPartition(), record.kafkaOffset()));
private final String name;
@@ -49,7 +43,7 @@ record -> String.format("%s+%s+%s", record.topic(), record.kafkaPartition(), rec
private final Function docIdGenerator;
private DocumentIDStrategy(final String name, final String description,
- final Function docIdGenerator) {
+ final Function docIdGenerator) {
this.name = name.toLowerCase(Locale.ROOT);
this.description = description;
this.docIdGenerator = docIdGenerator;
@@ -74,7 +68,8 @@ public static DocumentIDStrategy fromString(final String name) {
}
public static String describe() {
- return Arrays.stream(values()).map(v -> v.toString() + " : " + v.description)
+ return Arrays.stream(values())
+ .map(v -> v.toString() + " : " + v.description)
.collect(Collectors.joining(", ", "{", "}"));
}
@@ -109,34 +104,23 @@ private static String convertKey(final Schema keySchema, final Object key) {
schemaType = ConnectSchema.schemaType(key.getClass());
if (schemaType == null) {
throw new DataException(
- String.format("Java class %s does not have corresponding schema type.", key.getClass())
- );
+ String.format("Java class %s does not have corresponding schema type.", key.getClass()));
}
} else {
schemaType = keySchema.type();
}
switch (schemaType) {
- case INT8:
- case INT16:
- case INT32:
- case INT64:
- case STRING:
+ case INT8 :
+ case INT16 :
+ case INT32 :
+ case INT64 :
+ case STRING :
return String.valueOf(key);
- default:
- throw new DataException(
- String.format(
- "%s is not supported as the document id. Supported are: %s",
- schemaType.name(),
- List.of(
- Schema.INT8_SCHEMA,
- Schema.INT16_SCHEMA,
- Schema.INT32_SCHEMA,
- Schema.INT64_SCHEMA,
- Schema.STRING_SCHEMA
- )
- )
- );
+ default :
+ throw new DataException(String.format("%s is not supported as the document id. Supported are: %s",
+ schemaType.name(), List.of(Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA,
+ Schema.INT64_SCHEMA, Schema.STRING_SCHEMA)));
}
}
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/IndexWriteMethod.java b/src/main/java/io/aiven/kafka/connect/opensearch/IndexWriteMethod.java
index 5a4e68f8..aeb574d6 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/IndexWriteMethod.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/IndexWriteMethod.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.util.Arrays;
@@ -26,8 +24,7 @@
public enum IndexWriteMethod {
- INSERT,
- UPSERT;
+ INSERT, UPSERT;
public static String possibleValues() {
return Arrays.stream(IndexWriteMethod.values())
@@ -42,8 +39,8 @@ public void ensureValid(final String name, final Object value) {
try {
IndexWriteMethod.valueOf(((String) value).toUpperCase(Locale.ROOT));
} catch (final IllegalArgumentException e) {
- throw new ConfigException(name, value, "Index write method must be one of: "
- + IndexWriteMethod.possibleValues());
+ throw new ConfigException(name, value,
+ "Index write method must be one of: " + IndexWriteMethod.possibleValues());
}
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/Mapping.java b/src/main/java/io/aiven/kafka/connect/opensearch/Mapping.java
index 0416927d..5b561eb8 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/Mapping.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/Mapping.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.io.IOException;
@@ -88,40 +86,37 @@ public static XContentBuilder buildMappingFor(final Schema schema) {
}
}
- private static XContentBuilder buildMapping(
- final Schema schema,
- final XContentBuilder builder) throws IOException {
+ private static XContentBuilder buildMapping(final Schema schema, final XContentBuilder builder) throws IOException {
final var logicalConversion = logicalMapping(schema, builder);
if (Objects.nonNull(logicalConversion)) {
return builder;
}
switch (schema.type()) {
- case ARRAY:
+ case ARRAY :
return buildMapping(schema.valueSchema(), builder);
- case MAP:
+ case MAP :
return buildMap(schema, builder);
- case STRUCT:
+ case STRUCT :
return buildStruct(schema, builder);
- default:
+ default :
return buildPrimitive(builder, schemaTypeToOsType(schema.type()), schema.defaultValue());
}
}
- private static XContentBuilder logicalMapping(
- final Schema schema,
- final XContentBuilder builder) throws IOException {
+ private static XContentBuilder logicalMapping(final Schema schema, final XContentBuilder builder)
+ throws IOException {
if (Objects.isNull(schema.name())) {
return null;
}
switch (schema.name()) {
- case Date.LOGICAL_NAME:
- case Time.LOGICAL_NAME:
- case Timestamp.LOGICAL_NAME:
+ case Date.LOGICAL_NAME :
+ case Time.LOGICAL_NAME :
+ case Timestamp.LOGICAL_NAME :
return buildPrimitive(builder, DATE_TYPE, schema.defaultValue());
- case Decimal.LOGICAL_NAME:
+ case Decimal.LOGICAL_NAME :
return buildPrimitive(builder, DOUBLE_TYPE, schema.defaultValue());
- default:
+ default :
// User-defined type or unknown built-in
return null;
}
@@ -141,8 +136,7 @@ private static XContentBuilder buildMap(final Schema schema, final XContentBuild
return builder.endObject();
}
- private static XContentBuilder buildStruct(
- final Schema schema, final XContentBuilder builder) throws IOException {
+ private static XContentBuilder buildStruct(final Schema schema, final XContentBuilder builder) throws IOException {
builder.startObject(PROPERTIES_FIELD);
for (final var field : schema.fields()) {
builder.startObject(field.name());
@@ -155,32 +149,30 @@ private static XContentBuilder buildStruct(
// visible for testing
protected static String schemaTypeToOsType(final Schema.Type schemaType) {
switch (schemaType) {
- case BOOLEAN:
+ case BOOLEAN :
return BOOLEAN_TYPE;
- case INT8:
+ case INT8 :
return BYTE_TYPE;
- case INT16:
+ case INT16 :
return SHORT_TYPE;
- case INT32:
+ case INT32 :
return INTEGER_TYPE;
- case INT64:
+ case INT64 :
return LONG_TYPE;
- case FLOAT32:
+ case FLOAT32 :
return FLOAT_TYPE;
- case FLOAT64:
+ case FLOAT64 :
return DOUBLE_TYPE;
- case STRING:
+ case STRING :
return TEXT_TYPE;
- case BYTES:
+ case BYTES :
return BINARY_TYPE;
- default:
+ default :
return null;
}
}
- private static XContentBuilder buildPrimitive(
- final XContentBuilder builder,
- final String type,
+ private static XContentBuilder buildPrimitive(final XContentBuilder builder, final String type,
final Object defaultValue) throws IOException {
if (type == null) {
throw new DataException("Invalid primitive type");
@@ -196,28 +188,28 @@ private static XContentBuilder buildPrimitive(
}
switch (type) {
- case BYTE_TYPE:
+ case BYTE_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (byte) defaultValue);
- case SHORT_TYPE:
+ case SHORT_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (short) defaultValue);
- case INTEGER_TYPE:
+ case INTEGER_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (int) defaultValue);
- case LONG_TYPE:
+ case LONG_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (long) defaultValue);
- case FLOAT_TYPE:
+ case FLOAT_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (float) defaultValue);
- case DOUBLE_TYPE:
+ case DOUBLE_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (double) defaultValue);
- case BOOLEAN_TYPE:
+ case BOOLEAN_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, (boolean) defaultValue);
- case DATE_TYPE:
+ case DATE_TYPE :
return builder.field(DEFAULT_VALUE_FIELD, ((java.util.Date) defaultValue).getTime());
- case STRING_TYPE:
- case TEXT_TYPE:
- case BINARY_TYPE:
+ case STRING_TYPE :
+ case TEXT_TYPE :
+ case BINARY_TYPE :
// IGNORE default values for text and binary types as this is not supported by OS side.
return builder;
- default:
+ default :
throw new DataException("Invalid primitive type " + type);
}
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java
index 73e61cb6..22c5e6f0 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java
@@ -1,6 +1,5 @@
/*
* Copyright 2019 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.util.Objects;
@@ -34,20 +32,17 @@
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
/**
- * Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client
- * if configured.
+ * Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured.
*/
public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor {
public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
- private static final String CONNECTION_USERNAME_DOC =
- "The username used to authenticate with OpenSearch. "
- + "The default is the null, and authentication will only be performed if "
- + " both the username and password are non-null.";
+ private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. "
+ + "The default is the null, and authentication will only be performed if "
+ + " both the username and password are non-null.";
public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
- private static final String CONNECTION_PASSWORD_DOC =
- "The password used to authenticate with OpenSearch. "
- + "The default is the null, and authentication will only be performed if "
- + " both the username and password are non-null.";
+ private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. "
+ + "The default is the null, and authentication will only be performed if "
+ + " both the username and password are non-null.";
@Override
public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) {
@@ -57,13 +52,8 @@ public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsync
final var credentialsProvider = new BasicCredentialsProvider();
for (final var httpHost : config.httpHosts()) {
- credentialsProvider.setCredentials(
- new AuthScope(httpHost),
- new UsernamePasswordCredentials(
- connectionUsername(config),
- connectionPassword(config).value()
- )
- );
+ credentialsProvider.setCredentials(new AuthScope(httpHost),
+ new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value()));
}
builder.setDefaultCredentialsProvider(credentialsProvider);
@@ -72,34 +62,16 @@ public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsync
@Override
public void addConfig(final ConfigDef config) {
- config
- .define(
- CONNECTION_USERNAME_CONFIG,
- Type.STRING,
- null,
- Importance.MEDIUM,
- CONNECTION_USERNAME_DOC,
- "Authentication",
- 0,
- Width.SHORT,
- "Connection Username"
- ).define(
- CONNECTION_PASSWORD_CONFIG,
- Type.PASSWORD,
- null,
- Importance.MEDIUM,
- CONNECTION_PASSWORD_DOC,
- "Authentication",
- 1,
- Width.SHORT,
- "Connection Password");
+ config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC,
+ "Authentication", 0, Width.SHORT, "Connection Username")
+ .define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC,
+ "Authentication", 1, Width.SHORT, "Connection Password");
}
private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
- return Objects.nonNull(connectionUsername(config))
- && Objects.nonNull(connectionPassword(config));
+ return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config));
}
-
+
private static String connectionUsername(final OpensearchSinkConnectorConfig config) {
return config.getString(CONNECTION_USERNAME_CONFIG);
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java
index fe7cc17f..5f63420f 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchClient.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.io.IOException;
@@ -78,8 +76,7 @@ public class OpensearchClient implements AutoCloseable {
private static final String RESOURCE_ALREADY_EXISTS_AS_ALIAS = "already exists as alias";
- private static final String RESOURCE_ALREADY_EXISTS_AS_DATASTREAM =
- "creates data streams only, use create data stream api instead";
+ private static final String RESOURCE_ALREADY_EXISTS_AS_DATASTREAM = "creates data streams only, use create data stream api instead";
private static final String DEFAULT_OS_VERSION = "1.0.0";
@@ -95,20 +92,12 @@ public OpensearchClient(final OpensearchSinkConnectorConfig config) {
}
public OpensearchClient(final OpensearchSinkConnectorConfig config, final ErrantRecordReporter reporter) {
- this(
- new RestHighLevelClient(
- RestClient.builder(config.httpHosts())
- .setHttpClientConfigCallback(
- new HttpClientConfigCallback(config)
- )
- ),
- config,
- reporter
- );
+ this(new RestHighLevelClient(RestClient.builder(config.httpHosts())
+ .setHttpClientConfigCallback(new HttpClientConfigCallback(config))), config, reporter);
}
protected OpensearchClient(final RestHighLevelClient client, final OpensearchSinkConnectorConfig config,
- final ErrantRecordReporter reporter) {
+ final ErrantRecordReporter reporter) {
this.client = client;
this.config = config;
this.bulkProcessor = new BulkProcessor(Time.SYSTEM, client, config, reporter);
@@ -130,122 +119,91 @@ public String getVersion() {
}
public boolean indexOrDataStreamExists(final String index) {
- return withRetry(
- String.format("check index %s exists", index),
- () -> client.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT)
- );
+ return withRetry(String.format("check index %s exists", index),
+ () -> client.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT));
}
protected boolean dataStreamIndexTemplateExists(final String dataStreamIndexTemplate) {
- return withRetry(
- String.format("check index template exists %s", dataStreamIndexTemplate),
- () -> client.indices().existsIndexTemplate(
- new ComposableIndexTemplateExistRequest(dataStreamIndexTemplate),
- RequestOptions.DEFAULT
- )
- );
+ return withRetry(String.format("check index template exists %s", dataStreamIndexTemplate),
+ () -> client.indices()
+ .existsIndexTemplate(new ComposableIndexTemplateExistRequest(dataStreamIndexTemplate),
+ RequestOptions.DEFAULT));
}
- protected boolean createDataStreamIndexTemplate(
- final String dataStreamName,
+ protected boolean createDataStreamIndexTemplate(final String dataStreamName,
final String dataStreamTimestampField) {
final var dataStreamIndexTemplate = String.format(DATA_STREAM_TEMPLATE_NAME_PATTERN, dataStreamName);
if (!dataStreamIndexTemplateExists(dataStreamIndexTemplate)) {
- return withRetry(
- String.format("create index template %s", dataStreamIndexTemplate),
- () -> {
- try {
- client.indices().putIndexTemplate(
- new PutComposableIndexTemplateRequest()
- .name(dataStreamIndexTemplate)
- .indexTemplate(
- new ComposableIndexTemplate(
- List.of(dataStreamName),
- null,
- null,
- 200L,
- null,
- null,
- new DataStreamTemplate(
- new TimestampField(dataStreamTimestampField)
- )
- )
- ),
- RequestOptions.DEFAULT
- );
- } catch (final OpenSearchStatusException | IOException e) {
- if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION))) {
- throw e;
- }
- return false;
- }
- return true;
- });
+ return withRetry(String.format("create index template %s", dataStreamIndexTemplate), () -> {
+ try {
+ client.indices()
+ .putIndexTemplate(
+ new PutComposableIndexTemplateRequest().name(dataStreamIndexTemplate)
+ .indexTemplate(new ComposableIndexTemplate(List.of(dataStreamName), null,
+ null, 200L, null, null,
+ new DataStreamTemplate(
+ new TimestampField(dataStreamTimestampField)))),
+ RequestOptions.DEFAULT);
+ } catch (final OpenSearchStatusException | IOException e) {
+ if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION))) {
+ throw e;
+ }
+ return false;
+ }
+ return true;
+ });
}
return true;
}
- public boolean createIndexTemplateAndDataStream(
- final String dataStreamName,
+ public boolean createIndexTemplateAndDataStream(final String dataStreamName,
final String dataStreamTimestampField) {
if (createDataStreamIndexTemplate(dataStreamName, dataStreamTimestampField)) {
- return withRetry(
- String.format("create data stream %s", dataStreamName),
- () -> {
- try {
- client.indices().createDataStream(
- new CreateDataStreamRequest(dataStreamName),
- RequestOptions.DEFAULT
- );
- return true;
- } catch (final OpenSearchStatusException | IOException e) {
- if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)
- || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_ALIAS)
- || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_DATASTREAM))) {
- throw e;
- }
- return false;
- }
- });
+ return withRetry(String.format("create data stream %s", dataStreamName), () -> {
+ try {
+ client.indices()
+ .createDataStream(new CreateDataStreamRequest(dataStreamName), RequestOptions.DEFAULT);
+ return true;
+ } catch (final OpenSearchStatusException | IOException e) {
+ if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)
+ || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_ALIAS)
+ || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_DATASTREAM))) {
+ throw e;
+ }
+ return false;
+ }
+ });
}
return false;
}
public boolean createIndex(final String index) {
- return withRetry(
- String.format("create index %s", index),
- () -> {
- try {
- client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
- return true;
- } catch (final OpenSearchStatusException | IOException e) {
- if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)
- || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_ALIAS)
- || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_DATASTREAM))) {
- throw e;
- }
- return false;
- }
- });
+ return withRetry(String.format("create index %s", index), () -> {
+ try {
+ client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
+ return true;
+ } catch (final OpenSearchStatusException | IOException e) {
+ if (!(e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)
+ || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_ALIAS)
+ || e.getMessage().contains(RESOURCE_ALREADY_EXISTS_AS_DATASTREAM))) {
+ throw e;
+ }
+ return false;
+ }
+ });
}
public void createMapping(final String index, final Schema schema) {
final var request = new PutMappingRequest(index).source(Mapping.buildMappingFor(schema));
- withRetry(
- String.format("create mapping for index %s with schema %s", index, schema),
- () -> client.indices().putMapping(request, RequestOptions.DEFAULT)
- );
+ withRetry(String.format("create mapping for index %s with schema %s", index, schema),
+ () -> client.indices().putMapping(request, RequestOptions.DEFAULT));
}
public boolean hasMapping(final String index) {
final var request = new GetMappingsRequest().indices(index);
- final var response = withRetry(
- "",
- () -> client.indices().getMapping(request, RequestOptions.DEFAULT)
- );
+ final var response = withRetry("", () -> client.indices().getMapping(request, RequestOptions.DEFAULT));
final var mappings = response.mappings().get(index);
- return Objects.nonNull(mappings)
- && Objects.nonNull(mappings.sourceAsMap())
+ return Objects.nonNull(mappings) && Objects.nonNull(mappings.sourceAsMap())
&& !mappings.sourceAsMap().isEmpty();
}
@@ -295,9 +253,7 @@ public HttpAsyncClientBuilder customizeHttpClient(final HttpAsyncClientBuilder h
}
});
- httpClientBuilder
- .setConnectionManager(createConnectionManager())
- .setDefaultRequestConfig(requestConfig);
+ httpClientBuilder.setConnectionManager(createConnectionManager()).setDefaultRequestConfig(requestConfig);
return httpClientBuilder;
}
@@ -316,19 +272,14 @@ private PoolingNHttpClientConnectionManager createConnectionManager() {
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build();
- final var connectionManager =
- new PoolingNHttpClientConnectionManager(
- new DefaultConnectingIOReactor(ioReactorConfig),
- registry
- );
+ final var connectionManager = new PoolingNHttpClientConnectionManager(
+ new DefaultConnectingIOReactor(ioReactorConfig), registry);
final var maxPerRoute = Math.max(10, config.maxInFlightRequests() * 2);
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
connectionManager.setMaxTotal(maxPerRoute * config.httpHosts().length);
return connectionManager;
- } catch (final IOReactorException
- | NoSuchAlgorithmException
- | KeyStoreException
- | KeyManagementException e) {
+ } catch (final IOReactorException | NoSuchAlgorithmException | KeyStoreException
+ | KeyManagementException e) {
throw new ConnectException("Unable to open ElasticsearchClient.", e);
}
}
@@ -339,5 +290,4 @@ public T withRetry(final String callName, final Callable callable) {
return RetryUtil.callWithRetry(callName, callable, config.maxRetry(), config.retryBackoffMs());
}
-
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java
index 7fda87e7..5c5b78db 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnector.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.util.ArrayList;
@@ -44,10 +42,7 @@ public void start(final Map props) throws ConnectException {
// validation
new OpensearchSinkConnectorConfig(props);
} catch (final ConfigException e) {
- throw new ConnectException(
- "Couldn't start OpensearchSinkConnector due to configuration error",
- e
- );
+ throw new ConnectException("Couldn't start OpensearchSinkConnector due to configuration error", e);
}
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java
index a35753d7..ca013527 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkConnectorConfig.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.net.MalformedURLException;
@@ -53,46 +51,37 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
public static final String DATA_CONVERSION_GROUP_NAME = "Data Conversion";
public static final String CONNECTION_URL_CONFIG = "connection.url";
- private static final String CONNECTION_URL_DOC =
- "List of OpenSearch HTTP connection URLs e.g. ``http://eshost1:9200,"
- + "http://eshost2:9200``.";
+ private static final String CONNECTION_URL_DOC = "List of OpenSearch HTTP connection URLs e.g. ``http://eshost1:9200,"
+ + "http://eshost2:9200``.";
public static final String BATCH_SIZE_CONFIG = "batch.size";
- private static final String BATCH_SIZE_DOC =
- "The number of records to process as a batch when writing to OpenSearch.";
+ private static final String BATCH_SIZE_DOC = "The number of records to process as a batch when writing to OpenSearch.";
public static final String MAX_IN_FLIGHT_REQUESTS_CONFIG = "max.in.flight.requests";
- private static final String MAX_IN_FLIGHT_REQUESTS_DOC =
- "The maximum number of indexing requests that can be in-flight to OpenSearch before "
- + "blocking further requests.";
+ private static final String MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of indexing requests that can be in-flight to OpenSearch before "
+ + "blocking further requests.";
public static final String MAX_BUFFERED_RECORDS_CONFIG = "max.buffered.records";
- private static final String MAX_BUFFERED_RECORDS_DOC =
- "The maximum number of records each task will buffer before blocking acceptance of more "
- + "records. This config can be used to limit the memory usage for each task.";
+ private static final String MAX_BUFFERED_RECORDS_DOC = "The maximum number of records each task will buffer before blocking acceptance of more "
+ + "records. This config can be used to limit the memory usage for each task.";
public static final String LINGER_MS_CONFIG = "linger.ms";
- private static final String LINGER_MS_DOC =
- "Linger time in milliseconds for batching.\n"
- + "Records that arrive in between request transmissions are batched into a single bulk "
- + "indexing request, based on the ``" + BATCH_SIZE_CONFIG + "`` configuration. Normally "
- + "this only occurs under load when records arrive faster than they can be sent out. "
- + "However it may be desirable to reduce the number of requests even under light load and "
- + "benefit from bulk indexing. This setting helps accomplish that - when a pending batch is"
- + " not full, rather than immediately sending it out the task will wait up to the given "
- + "delay to allow other records to be added so that they can be batched into a single "
- + "request.";
+ private static final String LINGER_MS_DOC = "Linger time in milliseconds for batching.\n"
+ + "Records that arrive in between request transmissions are batched into a single bulk "
+ + "indexing request, based on the ``" + BATCH_SIZE_CONFIG + "`` configuration. Normally "
+ + "this only occurs under load when records arrive faster than they can be sent out. "
+ + "However it may be desirable to reduce the number of requests even under light load and "
+ + "benefit from bulk indexing. This setting helps accomplish that - when a pending batch is"
+ + " not full, rather than immediately sending it out the task will wait up to the given "
+ + "delay to allow other records to be added so that they can be batched into a single " + "request.";
public static final String FLUSH_TIMEOUT_MS_CONFIG = "flush.timeout.ms";
- private static final String FLUSH_TIMEOUT_MS_DOC =
- "The timeout in milliseconds to use for periodic flushing, and when waiting for buffer "
- + "space to be made available by completed requests as records are added. If this timeout "
- + "is exceeded the task will fail.";
+ private static final String FLUSH_TIMEOUT_MS_DOC = "The timeout in milliseconds to use for periodic flushing, and when waiting for buffer "
+ + "space to be made available by completed requests as records are added. If this timeout "
+ + "is exceeded the task will fail.";
public static final String MAX_RETRIES_CONFIG = "max.retries";
- private static final String MAX_RETRIES_DOC =
- "The maximum number of retries that are allowed for failed indexing requests. If the retry "
- + "attempts are exhausted the task will fail.";
+ private static final String MAX_RETRIES_DOC = "The maximum number of retries that are allowed for failed indexing requests. If the retry "
+ + "attempts are exhausted the task will fail.";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
- private static final String RETRY_BACKOFF_MS_DOC =
- "How long to wait in milliseconds before attempting the first retry of a failed indexing "
- + "request. Upon a failure, this connector may wait up to twice as long as the previous "
- + "wait, up to the maximum number of retries. "
- + "This avoids retrying in a tight loop under failure scenarios.";
+ private static final String RETRY_BACKOFF_MS_DOC = "How long to wait in milliseconds before attempting the first retry of a failed indexing "
+ + "request. Upon a failure, this connector may wait up to twice as long as the previous "
+ + "wait, up to the maximum number of retries. "
+ + "This avoids retrying in a tight loop under failure scenarios.";
public static final String KEY_IGNORE_CONFIG = "key.ignore";
public static final String KEY_IGNORE_ID_STRATEGY_CONFIG = "key.ignore.id.strategy";
@@ -101,41 +90,35 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
public static final String DROP_INVALID_MESSAGE_CONFIG = "drop.invalid.message";
- private static final String KEY_IGNORE_DOC =
- "Whether to ignore the record key for the purpose of forming the OpenSearch document ID."
- + " When this is set to ``true``, document IDs will be generated according to the "
- + "``" + KEY_IGNORE_ID_STRATEGY_CONFIG + "`` strategy.\n"
- + "Note that this is a global config that applies to all topics, use "
- + "``" + TOPIC_KEY_IGNORE_CONFIG + "`` to apply ``" + KEY_IGNORE_ID_STRATEGY_CONFIG + "`` "
- + "strategy for specific topics only.";
- private static final String TOPIC_KEY_IGNORE_DOC =
- "List of topics for which ``" + KEY_IGNORE_CONFIG + "`` should be ``true``.";
- private static final String KEY_IGNORE_ID_STRATEGY_DOC =
- "Specifies the strategy to generate the Document ID. Only applicable when ``" + KEY_IGNORE_CONFIG + "`` is"
- + " ``true`` or specific topics are configured using ``" + TOPIC_KEY_IGNORE_CONFIG + "``. "
- + "Available strategies " + DocumentIDStrategy.describe() + ". "
- + "If not specified, the default generation strategy is ``topic.partition.offset``.\n";
- private static final String SCHEMA_IGNORE_CONFIG_DOC =
- "Whether to ignore schemas during indexing. When this is set to ``true``, the record "
- + "schema will be ignored for the purpose of registering an OpenSearch mapping. "
- + "OpenSearch will infer the mapping from the data (dynamic mapping needs to be enabled "
- + "by the user).\n Note that this is a global config that applies to all topics, use ``"
- + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
- private static final String TOPIC_SCHEMA_IGNORE_DOC =
- "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
- private static final String DROP_INVALID_MESSAGE_DOC =
- "Whether to drop kafka message when it cannot be converted to output message.";
+ private static final String KEY_IGNORE_DOC = "Whether to ignore the record key for the purpose of forming the OpenSearch document ID."
+ + " When this is set to ``true``, document IDs will be generated according to the " + "``"
+ + KEY_IGNORE_ID_STRATEGY_CONFIG + "`` strategy.\n"
+ + "Note that this is a global config that applies to all topics, use " + "``" + TOPIC_KEY_IGNORE_CONFIG
+ + "`` to apply ``" + KEY_IGNORE_ID_STRATEGY_CONFIG + "`` " + "strategy for specific topics only.";
+ private static final String TOPIC_KEY_IGNORE_DOC = "List of topics for which ``" + KEY_IGNORE_CONFIG
+ + "`` should be ``true``.";
+ private static final String KEY_IGNORE_ID_STRATEGY_DOC = "Specifies the strategy to generate the Document ID. Only applicable when ``"
+ + KEY_IGNORE_CONFIG + "`` is" + " ``true`` or specific topics are configured using ``"
+ + TOPIC_KEY_IGNORE_CONFIG + "``. " + "Available strategies " + DocumentIDStrategy.describe() + ". "
+ + "If not specified, the default generation strategy is ``topic.partition.offset``.\n";
+ private static final String SCHEMA_IGNORE_CONFIG_DOC = "Whether to ignore schemas during indexing. When this is set to ``true``, the record "
+ + "schema will be ignored for the purpose of registering an OpenSearch mapping. "
+ + "OpenSearch will infer the mapping from the data (dynamic mapping needs to be enabled "
+ + "by the user).\n Note that this is a global config that applies to all topics, use ``"
+ + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
+ private static final String TOPIC_SCHEMA_IGNORE_DOC = "List of topics for which ``" + SCHEMA_IGNORE_CONFIG
+ + "`` should be ``true``.";
+ private static final String DROP_INVALID_MESSAGE_DOC = "Whether to drop kafka message when it cannot be converted to output message.";
public static final String COMPACT_MAP_ENTRIES_CONFIG = "compact.map.entries";
- private static final String COMPACT_MAP_ENTRIES_DOC =
- "Defines how map entries with string keys within record values should be written to JSON. "
- + "When this is set to ``true``, these entries are written compactly as "
- + "``\"entryKey\": \"entryValue\"``. "
- + "Otherwise, map entries with string keys are written as a nested document "
- + "``{\"key\": \"entryKey\", \"value\": \"entryValue\"}``. "
- + "All map entries with non-string keys are always written as nested documents. "
- + "Prior to 3.3.0, this connector always wrote map entries as nested documents, "
- + "so set this to ``false`` to use that older behavior.";
+ private static final String COMPACT_MAP_ENTRIES_DOC = "Defines how map entries with string keys within record values should be written to JSON. "
+ + "When this is set to ``true``, these entries are written compactly as "
+ + "``\"entryKey\": \"entryValue\"``. "
+ + "Otherwise, map entries with string keys are written as a nested document "
+ + "``{\"key\": \"entryKey\", \"value\": \"entryValue\"}``. "
+ + "All map entries with non-string keys are always written as nested documents. "
+ + "Prior to 3.3.0, this connector always wrote map entries as nested documents, "
+ + "so set this to ``false`` to use that older behavior.";
public static final String CONNECTION_TIMEOUT_MS_CONFIG = "connection.timeout.ms";
public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";
@@ -145,8 +128,7 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
+ "interval, and will need to be restarted.";
private static final String READ_TIMEOUT_MS_CONFIG_DOC = "How long to wait in "
+ "milliseconds for the OpenSearch server to send a response. The task fails "
- + "if any read operation times out, and will need to be restarted to resume "
- + "further operations.";
+ + "if any read operation times out, and will need to be restarted to resume " + "further operations.";
public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
private static final String BEHAVIOR_ON_NULL_VALUES_DOC = "How to handle records with a "
@@ -156,19 +138,17 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
public static final String BEHAVIOR_ON_MALFORMED_DOCS_CONFIG = "behavior.on.malformed.documents";
private static final String BEHAVIOR_ON_MALFORMED_DOCS_DOC = "How to handle records that "
+ "OpenSearch rejects due to some malformation of the document itself, such as an index"
- + " mapping conflict or a field name containing illegal characters. \n"
- + "Valid options are:\n"
+ + " mapping conflict or a field name containing illegal characters. \n" + "Valid options are:\n"
+ "- ``ignore`` - do not index the record\n"
+ "- ``warn`` - log a warning message and do not index the record\n"
+ "- ``report`` - report to errant record reporter and do not index the record\n"
+ "- ``fail`` - fail the task.\n\n";
-
+
public static final String BEHAVIOR_ON_VERSION_CONFLICT_CONFIG = "behavior.on.version.conflict";
private static final String BEHAVIOR_ON_VERSION_CONFLICT_DOC = "How to handle records that "
+ "OpenSearch rejects due to document's version conflicts.\n"
+ "It may happen when offsets were not committed or/and records have to be reprocessed.\n"
- + "Valid options are:\n"
- + "- ``ignore`` - ignore and keep the existing record\n"
+ + "Valid options are:\n" + "- ``ignore`` - ignore and keep the existing record\n"
+ "- ``warn`` - log a warning message and keep the existing record\n"
+ "- ``report`` - report to errant record reporter and keep the existing record\n"
+ "- ``fail`` - fail the task.\n\n";
@@ -176,14 +156,12 @@ public class OpensearchSinkConnectorConfig extends AbstractConfig {
public static final String INDEX_WRITE_METHOD = "index.write.method";
public static final String INDEX_WRITE_METHOD_DOC = String.format(
- "The method used to write data into OpenSearch index."
- + "The default value is ``%s`` which means that "
+ "The method used to write data into OpenSearch index." + "The default value is ``%s`` which means that "
+ "the record with the same document id will be replaced. "
+ "The ``%s`` will create a new document if one does not exist or "
+ "will update the existing document.",
IndexWriteMethod.INSERT.name().toLowerCase(Locale.ROOT),
- IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT)
- );
+ IndexWriteMethod.UPSERT.name().toLowerCase(Locale.ROOT));
public static final String DATA_STREAM_ENABLED = "data.stream.enabled";
@@ -214,13 +192,15 @@ protected static ConfigDef baseConfigDef() {
}
/**
- * Load configuration definitions from the extension points (if available) using
- * {@link ServiceLoader} mechanism to discover them.
- * @param configDef configuration definitions to contribute to
+ * Load configuration definitions from the extension points (if available) using {@link ServiceLoader} mechanism to
+ * discover them.
+ *
+ * @param configDef
+ * configuration definitions to contribute to
*/
private static void addSpiConfigs(final ConfigDef configDef) {
- final ServiceLoader loaders = ServiceLoader
- .load(ConfigDefContributor.class, OpensearchSinkConnectorConfig.class.getClassLoader());
+ final ServiceLoader loaders = ServiceLoader.load(ConfigDefContributor.class,
+ OpensearchSinkConnectorConfig.class.getClassLoader());
final Iterator iterator = loaders.iterator();
while (iterator.hasNext()) {
@@ -230,284 +210,95 @@ private static void addSpiConfigs(final ConfigDef configDef) {
private static void addConnectorConfigs(final ConfigDef configDef) {
int order = 0;
- configDef.define(
- CONNECTION_URL_CONFIG,
- Type.LIST,
- ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.Validator() {
- @Override
- public void ensureValid(final String name, final Object value) {
- // If value is null default validator for required value is triggered.
- if (value != null) {
- @SuppressWarnings("unchecked") final var urls = (List) value;
- for (final var url : urls) {
- try {
- new URL(url);
- } catch (final MalformedURLException e) {
- throw new ConfigException(CONNECTION_URL_CONFIG, url);
- }
- }
+ configDef.define(CONNECTION_URL_CONFIG, Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.Validator() {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ // If value is null default validator for required value is triggered.
+ if (value != null) {
+ @SuppressWarnings("unchecked")
+ final var urls = (List) value;
+ for (final var url : urls) {
+ try {
+ new URL(url);
+ } catch (final MalformedURLException e) {
+ throw new ConfigException(CONNECTION_URL_CONFIG, url);
}
}
-
- @Override
- public String toString() {
- return String.join(", ", "http://eshost1:9200", "http://eshost2:9200");
- }
- },
- Importance.HIGH,
- CONNECTION_URL_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.LONG,
- "Connection URLs"
- ).define(
- BATCH_SIZE_CONFIG,
- Type.INT,
- 2000,
- Importance.MEDIUM,
- BATCH_SIZE_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Batch Size"
- ).define(
- MAX_IN_FLIGHT_REQUESTS_CONFIG,
- Type.INT,
- 5,
- Importance.MEDIUM,
- MAX_IN_FLIGHT_REQUESTS_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Max In-flight Requests"
- ).define(
- MAX_BUFFERED_RECORDS_CONFIG,
- Type.INT,
- 20000,
- Importance.LOW,
- MAX_BUFFERED_RECORDS_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Max Buffered Records"
- ).define(
- LINGER_MS_CONFIG,
- Type.LONG,
- 1L,
- Importance.LOW,
- LINGER_MS_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Linger (ms)"
- ).define(
- FLUSH_TIMEOUT_MS_CONFIG,
- Type.LONG,
- 10000L,
- Importance.LOW,
- FLUSH_TIMEOUT_MS_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Flush Timeout (ms)"
- ).define(
- MAX_RETRIES_CONFIG,
- Type.INT,
- 5,
- Importance.LOW,
- MAX_RETRIES_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Max Retries"
- ).define(
- RETRY_BACKOFF_MS_CONFIG,
- Type.LONG,
- 100L,
- Importance.LOW,
- RETRY_BACKOFF_MS_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Retry Backoff (ms)"
- ).define(
- CONNECTION_TIMEOUT_MS_CONFIG,
- Type.INT,
- 1000,
- Importance.LOW,
- CONNECTION_TIMEOUT_MS_CONFIG_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Connection Timeout"
- ).define(
- READ_TIMEOUT_MS_CONFIG,
- Type.INT,
- 3000,
- Importance.LOW,
- READ_TIMEOUT_MS_CONFIG_DOC,
- CONNECTOR_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Read Timeout"
- );
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.join(", ", "http://eshost1:9200", "http://eshost2:9200");
+ }
+ }, Importance.HIGH, CONNECTION_URL_DOC, CONNECTOR_GROUP_NAME, ++order, Width.LONG, "Connection URLs")
+ .define(BATCH_SIZE_CONFIG, Type.INT, 2000, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP_NAME,
+ ++order, Width.SHORT, "Batch Size")
+ .define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, 5, Importance.MEDIUM, MAX_IN_FLIGHT_REQUESTS_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Max In-flight Requests")
+ .define(MAX_BUFFERED_RECORDS_CONFIG, Type.INT, 20000, Importance.LOW, MAX_BUFFERED_RECORDS_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Max Buffered Records")
+ .define(LINGER_MS_CONFIG, Type.LONG, 1L, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP_NAME, ++order,
+ Width.SHORT, "Linger (ms)")
+ .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, 10000L, Importance.LOW, FLUSH_TIMEOUT_MS_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Flush Timeout (ms)")
+ .define(MAX_RETRIES_CONFIG, Type.INT, 5, Importance.LOW, MAX_RETRIES_DOC, CONNECTOR_GROUP_NAME, ++order,
+ Width.SHORT, "Max Retries")
+ .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW, RETRY_BACKOFF_MS_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Retry Backoff (ms)")
+ .define(CONNECTION_TIMEOUT_MS_CONFIG, Type.INT, 1000, Importance.LOW, CONNECTION_TIMEOUT_MS_CONFIG_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Connection Timeout")
+ .define(READ_TIMEOUT_MS_CONFIG, Type.INT, 3000, Importance.LOW, READ_TIMEOUT_MS_CONFIG_DOC,
+ CONNECTOR_GROUP_NAME, ++order, Width.SHORT, "Read Timeout");
}
private static void addConversionConfigs(final ConfigDef configDef) {
int order = 0;
- configDef.define(
- INDEX_WRITE_METHOD,
- Type.STRING,
- IndexWriteMethod.INSERT.toString().toLowerCase(Locale.ROOT),
- IndexWriteMethod.VALIDATOR,
- Importance.LOW,
- INDEX_WRITE_METHOD_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Index write method"
- ).define(
- KEY_IGNORE_CONFIG,
- Type.BOOLEAN,
- false,
- Importance.HIGH,
- KEY_IGNORE_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Ignore Key mode"
- ).define(
- KEY_IGNORE_ID_STRATEGY_CONFIG,
- Type.STRING,
- DocumentIDStrategy.TOPIC_PARTITION_OFFSET.toString(),
- DocumentIDStrategy.VALIDATOR,
- Importance.LOW,
- KEY_IGNORE_ID_STRATEGY_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.LONG,
- "Document ID generation strategy"
- ).define(
- SCHEMA_IGNORE_CONFIG,
- Type.BOOLEAN,
- false,
- Importance.LOW,
- SCHEMA_IGNORE_CONFIG_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Ignore Schema mode"
- ).define(
- COMPACT_MAP_ENTRIES_CONFIG,
- Type.BOOLEAN,
- true,
- Importance.LOW,
- COMPACT_MAP_ENTRIES_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Compact Map Entries"
- ).define(
- TOPIC_KEY_IGNORE_CONFIG,
- Type.LIST,
- "",
- Importance.LOW,
- TOPIC_KEY_IGNORE_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.LONG,
- "Topics for 'Ignore Key' mode"
- ).define(
- TOPIC_SCHEMA_IGNORE_CONFIG,
- Type.LIST,
- "",
- Importance.LOW,
- TOPIC_SCHEMA_IGNORE_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.LONG,
- "Topics for 'Ignore Schema' mode"
- ).define(
- DROP_INVALID_MESSAGE_CONFIG,
- Type.BOOLEAN,
- false,
- Importance.LOW,
- DROP_INVALID_MESSAGE_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.LONG,
- "Drop invalid messages"
- ).define(
- BEHAVIOR_ON_NULL_VALUES_CONFIG,
- Type.STRING,
- RecordConverter.BehaviorOnNullValues.DEFAULT.toString(),
- RecordConverter.BehaviorOnNullValues.VALIDATOR,
- Importance.LOW,
- BEHAVIOR_ON_NULL_VALUES_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Behavior for null-valued records"
- ).define(
- BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
- Type.STRING,
- BulkProcessor.BehaviorOnMalformedDoc.DEFAULT.toString(),
- BulkProcessor.BehaviorOnMalformedDoc.VALIDATOR,
- Importance.LOW,
- BEHAVIOR_ON_MALFORMED_DOCS_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Behavior on malformed documents"
- ).define(
- BEHAVIOR_ON_VERSION_CONFLICT_CONFIG,
- Type.STRING,
- BulkProcessor.BehaviorOnVersionConflict.DEFAULT.toString(),
- BulkProcessor.BehaviorOnVersionConflict.VALIDATOR,
- Importance.LOW,
- BEHAVIOR_ON_VERSION_CONFLICT_DOC,
- DATA_CONVERSION_GROUP_NAME,
- ++order,
- Width.SHORT,
- "Behavior on document's version conflict (optimistic locking)");
+ configDef
+ .define(INDEX_WRITE_METHOD, Type.STRING, IndexWriteMethod.INSERT.toString().toLowerCase(Locale.ROOT),
+ IndexWriteMethod.VALIDATOR, Importance.LOW, INDEX_WRITE_METHOD_DOC, DATA_CONVERSION_GROUP_NAME,
+ ++order, Width.SHORT, "Index write method")
+ .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, false, Importance.HIGH, KEY_IGNORE_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Ignore Key mode")
+ .define(KEY_IGNORE_ID_STRATEGY_CONFIG, Type.STRING,
+ DocumentIDStrategy.TOPIC_PARTITION_OFFSET.toString(), DocumentIDStrategy.VALIDATOR,
+ Importance.LOW, KEY_IGNORE_ID_STRATEGY_DOC, DATA_CONVERSION_GROUP_NAME, ++order, Width.LONG,
+ "Document ID generation strategy")
+ .define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, false, Importance.LOW, SCHEMA_IGNORE_CONFIG_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Ignore Schema mode")
+ .define(COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, true, Importance.LOW, COMPACT_MAP_ENTRIES_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Compact Map Entries")
+ .define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, "", Importance.LOW, TOPIC_KEY_IGNORE_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.LONG, "Topics for 'Ignore Key' mode")
+ .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, "", Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.LONG, "Topics for 'Ignore Schema' mode")
+ .define(DROP_INVALID_MESSAGE_CONFIG, Type.BOOLEAN, false, Importance.LOW, DROP_INVALID_MESSAGE_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.LONG, "Drop invalid messages")
+ .define(BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING,
+ RecordConverter.BehaviorOnNullValues.DEFAULT.toString(),
+ RecordConverter.BehaviorOnNullValues.VALIDATOR, Importance.LOW, BEHAVIOR_ON_NULL_VALUES_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Behavior for null-valued records")
+ .define(BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, Type.STRING,
+ BulkProcessor.BehaviorOnMalformedDoc.DEFAULT.toString(),
+ BulkProcessor.BehaviorOnMalformedDoc.VALIDATOR, Importance.LOW, BEHAVIOR_ON_MALFORMED_DOCS_DOC,
+ DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT, "Behavior on malformed documents")
+ .define(BEHAVIOR_ON_VERSION_CONFLICT_CONFIG, Type.STRING,
+ BulkProcessor.BehaviorOnVersionConflict.DEFAULT.toString(),
+ BulkProcessor.BehaviorOnVersionConflict.VALIDATOR, Importance.LOW,
+ BEHAVIOR_ON_VERSION_CONFLICT_DOC, DATA_CONVERSION_GROUP_NAME, ++order, Width.SHORT,
+ "Behavior on document's version conflict (optimistic locking)");
}
private static void addDataStreamConfig(final ConfigDef configDef) {
int order = 0;
- configDef.define(
- DATA_STREAM_ENABLED,
- Type.BOOLEAN,
- false,
- Importance.MEDIUM,
- DATA_STREAM_ENABLED_DOC,
- DATA_STREAM_GROUP_NAME,
- ++order,
- Width.LONG,
- "Data stream name"
- ).define(
- DATA_STREAM_PREFIX,
- Type.STRING,
- null,
- new ConfigDef.NonEmptyString(),
- Importance.MEDIUM,
- DATA_STREAM_NAME_DOC,
- DATA_STREAM_GROUP_NAME,
- ++order,
- Width.LONG,
- "Data stream name"
- ).define(
- DATA_STREAM_TIMESTAMP_FIELD,
- Type.STRING,
- DATA_STREAM_TIMESTAMP_FIELD_DEFAULT,
- new ConfigDef.NonEmptyString(),
- Importance.MEDIUM,
- DATA_STREAM_TIMESTAMP_FIELD_DOC,
- DATA_STREAM_GROUP_NAME,
- ++order,
- Width.LONG,
- "Data stream timestamp field"
- );
+ configDef
+ .define(DATA_STREAM_ENABLED, Type.BOOLEAN, false, Importance.MEDIUM, DATA_STREAM_ENABLED_DOC,
+ DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream name")
+ .define(DATA_STREAM_PREFIX, Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.MEDIUM,
+ DATA_STREAM_NAME_DOC, DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream name")
+ .define(DATA_STREAM_TIMESTAMP_FIELD, Type.STRING, DATA_STREAM_TIMESTAMP_FIELD_DEFAULT,
+ new ConfigDef.NonEmptyString(), Importance.MEDIUM, DATA_STREAM_TIMESTAMP_FIELD_DOC,
+ DATA_STREAM_GROUP_NAME, ++order, Width.LONG, "Data stream timestamp field");
}
public static final ConfigDef CONFIG = baseConfigDef();
@@ -529,17 +320,11 @@ private void validate() {
if (!dataStreamEnabled() && dataStreamPrefix().isPresent()) {
LOGGER.warn("The property data.stream.prefix was set but data streams are not enabled");
}
- if (indexWriteMethod() == IndexWriteMethod.UPSERT
- && ignoreKey()
+ if (indexWriteMethod() == IndexWriteMethod.UPSERT && ignoreKey()
&& documentIdStrategy() != DocumentIDStrategy.RECORD_KEY) {
- throw new ConfigException(
- KEY_IGNORE_ID_STRATEGY_CONFIG, documentIdStrategy().toString(),
- String.format(
- "%s is not supported for index upsert. Supported is: %s",
- documentIdStrategy().toString(),
- DocumentIDStrategy.RECORD_KEY
- )
- );
+ throw new ConfigException(KEY_IGNORE_ID_STRATEGY_CONFIG, documentIdStrategy().toString(),
+ String.format("%s is not supported for index upsert. Supported is: %s",
+ documentIdStrategy().toString(), DocumentIDStrategy.RECORD_KEY));
}
}
@@ -641,8 +426,7 @@ private DocumentIDStrategy documentIdStrategy() {
}
public DocumentIDStrategy documentIdStrategy(final String topic) {
- return (ignoreKey() || topicIgnoreKey().contains(topic))
- ? documentIdStrategy() : DocumentIDStrategy.RECORD_KEY;
+ return (ignoreKey() || topicIgnoreKey().contains(topic)) ? documentIdStrategy() : DocumentIDStrategy.RECORD_KEY;
}
public Function topicToIndexNameConverter() {
@@ -673,8 +457,7 @@ private static String convertTopicToIndexName(final String topic) {
}
private String convertTopicToDataStreamName(final String topic) {
- return dataStreamPrefix()
- .map(prefix -> String.format("%s-%s", prefix, convertTopicToIndexName(topic)))
+ return dataStreamPrefix().map(prefix -> String.format("%s-%s", prefix, convertTopicToIndexName(topic)))
.orElseGet(() -> convertTopicToIndexName(topic));
}
@@ -683,21 +466,18 @@ public boolean ignoreSchemaFor(final String topic) {
}
public RecordConverter.BehaviorOnNullValues behaviorOnNullValues() {
- return RecordConverter.BehaviorOnNullValues.forValue(
- getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
- );
+ return RecordConverter.BehaviorOnNullValues
+ .forValue(getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG));
}
public BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc() {
- return BulkProcessor.BehaviorOnMalformedDoc.forValue(
- getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
- );
+ return BulkProcessor.BehaviorOnMalformedDoc
+ .forValue(getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG));
}
public BulkProcessor.BehaviorOnVersionConflict behaviorOnVersionConflict() {
- return BulkProcessor.BehaviorOnVersionConflict.forValue(
- getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG)
- );
+ return BulkProcessor.BehaviorOnVersionConflict
+ .forValue(getString(OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG));
}
public static void main(final String[] args) {
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTask.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTask.java
index 45b975b5..e40d6840 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTask.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchSinkTask.java
@@ -1,6 +1,5 @@
/*
* Copyright 2020 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,9 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
+import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG;
+
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
@@ -38,9 +39,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
-import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.BEHAVIOR_ON_VERSION_CONFLICT_CONFIG;
-
public class OpensearchSinkTask extends SinkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(OpensearchSinkTask.class);
@@ -77,10 +75,11 @@ public void start(final Map props) {
this.topicToIndexConverter = config.topicToIndexNameConverter();
// Calculate the maximum possible backoff time ...
- final long maxRetryBackoffMs =
- RetryUtil.computeRetryWaitTimeInMillis(config.maxRetry(), config.retryBackoffMs());
+ final long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(config.maxRetry(),
+ config.retryBackoffMs());
if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
- LOGGER.warn("This connector uses exponential backoff with jitter for retries, "
+ LOGGER.warn(
+ "This connector uses exponential backoff with jitter for retries, "
+ "and using '{}={}' and '{}={}' results in an impractical but possible maximum "
+ "backoff time greater than {} hours.",
OpensearchSinkConnectorConfig.MAX_RETRIES_CONFIG, config.maxRetry(),
@@ -91,10 +90,7 @@ public void start(final Map props) {
this.client = new OpensearchClient(config, getErrantRecordReporter());
this.recordConverter = new RecordConverter(config);
} catch (final ConfigException e) {
- throw new ConnectException(
- "Couldn't start OpensearchSinkTask due to configuration error:",
- e
- );
+ throw new ConnectException("Couldn't start OpensearchSinkTask due to configuration error:", e);
}
}
@@ -111,12 +107,8 @@ public void put(final Collection records) throws ConnectException {
LOGGER.trace("Putting {} to Opensearch", records);
for (final var record : records) {
if (ignoreRecord(record)) {
- LOGGER.debug(
- "Ignoring sink record with key {} and null value for topic/partition/offset {}/{}/{}",
- record.key(),
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset());
+ LOGGER.debug("Ignoring sink record with key {} and null value for topic/partition/offset {}/{}/{}",
+ record.key(), record.topic(), record.kafkaPartition(), record.kafkaOffset());
continue;
}
tryWriteRecord(record);
@@ -138,13 +130,8 @@ private void tryWriteRecord(final SinkRecord record) {
}
} catch (final DataException e) {
if (config.dropInvalidMessage()) {
- LOGGER.error(
- "Can't convert record from topic {} with partition {} and offset {}. Reason: ",
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset(),
- e
- );
+ LOGGER.error("Can't convert record from topic {} with partition {} and offset {}. Reason: ",
+ record.topic(), record.kafkaPartition(), record.kafkaOffset(), e);
} else {
throw e;
}
diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/RecordConverter.java b/src/main/java/io/aiven/kafka/connect/opensearch/RecordConverter.java
index 1b30b6a3..95010a3c 100644
--- a/src/main/java/io/aiven/kafka/connect/opensearch/RecordConverter.java
+++ b/src/main/java/io/aiven/kafka/connect/opensearch/RecordConverter.java
@@ -1,6 +1,5 @@
/*
* Copyright 2019 Aiven Oy
- * Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.aiven.kafka.connect.opensearch;
import java.math.BigDecimal;
@@ -62,9 +60,9 @@ public RecordConverter(final OpensearchSinkConnectorConfig config) {
public DocWriteRequest> convert(final SinkRecord record, final String indexOrDataStreamName) {
if (record.value() == null) {
switch (config.behaviorOnNullValues()) {
- case IGNORE:
+ case IGNORE :
return null;
- case DELETE:
+ case DELETE :
if (record.key() == null) {
// Since the record key is used as the ID of the index to delete and we don't have a key
// for this record, we can't delete anything anyways, so we ignore the record.
@@ -77,20 +75,15 @@ public DocWriteRequest> convert(final SinkRecord record, final String indexOrD
}
// Will proceed as normal, ultimately creating an with a null payload
break;
- case FAIL:
- default:
+ case FAIL :
+ default :
throw new DataException(String.format(
"Sink record with key of %s and null value encountered for topic/partition/offset "
+ "%s/%s/%s (to ignore future records like this change the configuration property "
+ "'%s' from '%s' to '%s')",
- record.key(),
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset(),
- OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
- BehaviorOnNullValues.FAIL,
- BehaviorOnNullValues.IGNORE
- ));
+ record.key(), record.topic(), record.kafkaPartition(), record.kafkaOffset(),
+ OpensearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL,
+ BehaviorOnNullValues.IGNORE));
}
}
return RequestBuilder.builder()
@@ -132,13 +125,13 @@ Schema preProcessSchema(final Schema schema) {
final String schemaName = schema.name();
if (schemaName != null) {
switch (schemaName) {
- case Decimal.LOGICAL_NAME:
+ case Decimal.LOGICAL_NAME :
return copySchemaBasics(schema, SchemaBuilder.float64()).build();
- case Date.LOGICAL_NAME:
- case Time.LOGICAL_NAME:
- case Timestamp.LOGICAL_NAME:
+ case Date.LOGICAL_NAME :
+ case Time.LOGICAL_NAME :
+ case Timestamp.LOGICAL_NAME :
return schema;
- default:
+ default :
// User type or unknown logical type
break;
}
@@ -146,13 +139,13 @@ Schema preProcessSchema(final Schema schema) {
final Schema.Type schemaType = schema.type();
switch (schemaType) {
- case ARRAY:
+ case ARRAY :
return preProcessArraySchema(schema);
- case MAP:
+ case MAP :
return preProcessMapSchema(schema);
- case STRUCT:
+ case STRUCT :
return preProcessStructSchema(schema);
- default:
+ default :
return schema;
}
}
@@ -173,7 +166,8 @@ private Schema preProcessMapSchema(final Schema schema) {
final SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema);
return copySchemaBasics(schema, result).build();
}
- final Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
+ final Schema elementSchema = SchemaBuilder.struct()
+ .name(keyName + "-" + valueName)
.field(Mapping.KEY_FIELD, preprocessedKeySchema)
.field(Mapping.VALUE_FIELD, preprocessedValueSchema)
.build();
@@ -221,13 +215,13 @@ Object preProcessValue(final Object value, final Schema schema, final Schema new
final Schema.Type schemaType = schema.type();
switch (schemaType) {
- case ARRAY:
+ case ARRAY :
return preProcessArrayValue(value, schema, newSchema);
- case MAP:
+ case MAP :
return preProcessMapValue(value, schema, newSchema);
- case STRUCT:
+ case STRUCT :
return preProcessStructValue(value, schema, newSchema);
- default:
+ default :
return value;
}
}
@@ -245,13 +239,13 @@ private Object preProcessNullValue(final Schema schema) {
// @returns the decoded logical value or null if this isn't a known logical type
private Object preProcessLogicalValue(final String schemaName, final Object value) {
switch (schemaName) {
- case Decimal.LOGICAL_NAME:
+ case Decimal.LOGICAL_NAME :
return ((BigDecimal) value).doubleValue();
- case Date.LOGICAL_NAME:
- case Time.LOGICAL_NAME:
- case Timestamp.LOGICAL_NAME:
+ case Date.LOGICAL_NAME :
+ case Time.LOGICAL_NAME :
+ case Timestamp.LOGICAL_NAME :
return value;
- default:
+ default :
// User-defined type or unknown built-in
return null;
}
@@ -274,10 +268,8 @@ private Object preProcessMapValue(final Object value, final Schema schema, final
if (config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) {
final Map