diff --git a/README.md b/README.md
index 2d4f85a..7ede4ba 100644
--- a/README.md
+++ b/README.md
@@ -6,13 +6,13 @@ The project originates from Confluent [kafka-connect-elasticsearch](https://gith
# Documentation
-TBD
+Supported Elasticsearch versions are 7.17.0+
# Contribute
-[Source Code](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
+[Source Code](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)
-[Issue Tracker](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
+[Issue Tracker](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)
# License
diff --git a/build.gradle b/build.gradle
index e0288c4..695232f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,7 +48,7 @@ java {
}
wrapper {
- distributionType = 'ALL'
+ distributionType = "ALL"
doLast {
def sha256Sum = new String(new URL("${distributionUrl}.sha256").bytes)
propertiesFile << "distributionSha256Sum=${sha256Sum}\n"
@@ -57,18 +57,15 @@ wrapper {
}
ext {
- guavaVersion = "11.0.2"
kafkaVersion = "2.2.0"
slf4jVersion = "2.0.12"
- log4jVersion = "2.23.0"
- elasticSearchVersion = "7.4.0"
- elasticClientVersion = "7.17.0"
+ elasticJavaClientVersion = "7.17.18"
testContainersElasticVersion = "1.19.6"
carrotsearchVersion = "2.8.1"
}
compileJava {
- options.compilerArgs = ['-Xlint:all', '-Werror']
+ options.compilerArgs = ["-Xlint:all", "-Werror"]
}
checkstyle {
@@ -81,28 +78,23 @@ jacoco {
}
dependencies {
- compileOnly "org.apache.kafka:connect-api:$kafkaVersion"
- compileOnly "org.apache.kafka:connect-json:$kafkaVersion"
+ compileOnly("org.apache.kafka:connect-api:$kafkaVersion")
+ compileOnly("org.apache.kafka:connect-json:$kafkaVersion")
- implementation "org.slf4j:slf4j-api:$slf4jVersion"
- implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion"
- implementation("com.google.guava:guava:$guavaVersion")
-
- implementation "org.apache.logging.log4j:log4j-api:$log4jVersion"
- implementation "org.apache.logging.log4j:log4j-core:$log4jVersion"
+ implementation("org.slf4j:slf4j-api:$slf4jVersion")
+ implementation("co.elastic.clients:elasticsearch-java:$elasticJavaClientVersion")
testImplementation("junit:junit:4.13.2") {
- exclude group: 'org.hamcrest', module: 'hamcrest-core'
+ exclude(group: "org.hamcrest", module: "hamcrest-core")
}
- testImplementation "org.hamcrest:hamcrest-all:1.3"
- testImplementation "org.mockito:mockito-core:5.11.0"
- testImplementation "org.mockito:mockito-all:1.10.19"
-
- testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
- testImplementation "org.apache.kafka:connect-json:$kafkaVersion"
- testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion"
- implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
- testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
+ testImplementation("org.hamcrest:hamcrest-all:1.3")
+ testImplementation("org.mockito:mockito-core:5.11.0")
+ testImplementation("org.mockito:mockito-all:1.10.19")
+
+ testImplementation("org.apache.kafka:connect-json:$kafkaVersion")
+ testImplementation("org.testcontainers:elasticsearch:$testContainersElasticVersion")
+ testImplementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
+ testRuntimeOnly("org.slf4j:slf4j-log4j12:$slf4jVersion")
}
distributions {
@@ -156,7 +148,7 @@ publishing {
}
processResources {
- filesMatching('elasticsearch-connector-for-apache-kafka-version.properties') {
+ filesMatching("elasticsearch-connector-for-apache-kafka-version.properties") {
expand(version: version)
}
}
@@ -169,12 +161,10 @@ jar {
}
}
-test {
-
- //we do not need to check classpath hell for testing
- systemProperty "tests.jarhell.check", "false"
-
- //tests.security.manager is true all ElasticsearchSinkTestBase aware test hang
- systemProperty "tests.security.manager", "false"
+def elasticsearch7Test = tasks.register("elasticsearch7Test", Test) {
+ environment("ELASTIC_TEST_CONTAINER_VERSION", "7.17.0")
+}
+tasks.named("check") {
+ dependsOn elasticsearch7Test
}
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6eeaa68..dad9472 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -25,7 +25,7 @@
/>
diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java
index cfe1152..c26d9f3 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java
@@ -26,8 +26,9 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import com.fasterxml.jackson.databind.JsonNode;
public interface ElasticsearchClient extends AutoCloseable {
@@ -66,7 +67,7 @@ enum Version {
* @param type the type
* @throws IOException if the client cannot execute the request
*/
- MappingMetadata getMapping(String index, String type) throws IOException;
+ Property getMapping(String index, String type) throws IOException;
/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
@@ -92,14 +93,14 @@ enum Version {
* @return the search result
* @throws IOException if the client cannot execute the request
*/
- SearchResponse search(String index) throws IOException;
+ SearchResponse search(String index) throws IOException;
/**
- * Executes a search.
+ * Refreshes the index.
*
* @param index the index to refresh
*/
- void refresh(String index) throws IOException;
+ void refreshIndex(String index) throws IOException;
/**
* Shuts down the client.
diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java
index 477b95a..fbd2c3b 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java
@@ -34,7 +34,7 @@
import org.apache.kafka.connect.sink.SinkTask;
import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
-import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;
+import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +55,7 @@ public void start(final Map props) {
start(props, null);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") //TOPIC_INDEX_MAP_CONFIG
// public for testing
public void start(final Map props, final ElasticsearchClient client) {
try {
@@ -121,7 +121,7 @@ public void start(final Map props, final ElasticsearchClient cli
if (client != null) {
this.client = client;
} else {
- this.client = new AivenElasticsearchClientWrapper(config);
+ this.client = new ElasticsearchClientWrapper(config);
}
final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
@@ -179,7 +179,7 @@ public void close(final Collection partitions) {
}
public void refresh(final String index) throws IOException {
- client.refresh(index);
+ client.refreshIndex(index);
}
@Override
diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java
index 9d84494..7aba173 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -31,7 +32,6 @@
import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +103,7 @@ public class ElasticsearchWriter {
behaviorOnMalformedDoc
);
- existingMappings = Sets.newHashSet();
+ existingMappings = new HashSet<>();
}
public static class Builder {
@@ -349,7 +349,7 @@ public void createIndicesForTopics(final Set assignedTopics) {
}
private Set indicesForTopics(final Set assignedTopics) {
- final Set indices = Sets.newHashSet();
+ final Set indices = new HashSet<>();
for (final String topic : assignedTopics) {
indices.add(convertTopicToIndexName(topic));
}
diff --git a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java
index 4411578..5e14e06 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java
@@ -29,10 +29,10 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
public class Mapping {
@@ -57,7 +57,7 @@ public static void createMapping(
/**
* Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
*/
- public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type)
+ public static Property getMapping(final ElasticsearchClient client, final String index, final String type)
throws IOException {
return client.getMapping(index, type);
}
diff --git a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java
similarity index 71%
rename from src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java
rename to src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java
index d0f1afc..a5a6fac 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java
@@ -19,14 +19,14 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
-public class BulkRequestImpl implements BulkRequest {
- private final org.elasticsearch.action.bulk.BulkRequest bulkRequest;
+public class ElasticsearchBulkRequest implements BulkRequest {
+ private final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest;
- public BulkRequestImpl(final org.elasticsearch.action.bulk.BulkRequest bulkRequest) {
+ public ElasticsearchBulkRequest(final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
}
- public org.elasticsearch.action.bulk.BulkRequest getBulkRequest() {
+ public co.elastic.clients.elasticsearch.core.BulkRequest getBulkRequest() {
return bulkRequest;
}
}
diff --git a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java
similarity index 52%
rename from src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
rename to src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java
index 623f3e9..3119d66 100644
--- a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java
+++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java
@@ -18,10 +18,10 @@
package io.aiven.connect.elasticsearch.clientwrapper;
import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -39,6 +39,25 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
+import co.elastic.clients.elasticsearch._types.VersionType;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.elasticsearch.indices.GetMappingRequest;
+import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
+import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
+import co.elastic.clients.elasticsearch.indices.RefreshRequest;
+import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import co.elastic.clients.util.BinaryData;
+import co.elastic.clients.util.ContentType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -48,51 +67,26 @@
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
-import org.elasticsearch.client.core.MainResponse;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.GetIndexRequest;
-import org.elasticsearch.client.indices.GetMappingsRequest;
-import org.elasticsearch.client.indices.GetMappingsResponse;
-import org.elasticsearch.client.indices.PutMappingRequest;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
-import org.elasticsearch.index.VersionType;
-import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AivenElasticsearchClientWrapper implements ElasticsearchClient {
+public class ElasticsearchClientWrapper implements ElasticsearchClient {
- // visible for testing
- protected static final String MAPPER_PARSE_EXCEPTION
- = "mapper_parse_exception";
- protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION
- = "version_conflict_engine_exception";
-
- private static final Logger LOG = LoggerFactory.getLogger(AivenElasticsearchClientWrapper.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchClientWrapper.class);
- @SuppressWarnings("deprecation")
- private final RestHighLevelClient elasticClient;
+ private final ElasticsearchTransport elasticTransport;
+ private final co.elastic.clients.elasticsearch.ElasticsearchClient elasticClient;
private final Version version;
// visible for testing
- @SuppressWarnings("deprecation")
-
- public AivenElasticsearchClientWrapper(final RestHighLevelClient client) {
+ public ElasticsearchClientWrapper(
+ final ElasticsearchTransport elasticTransport,
+ final co.elastic.clients.elasticsearch.ElasticsearchClient elasticClient) {
try {
- this.elasticClient = client;
+ this.elasticTransport = elasticTransport;
+ this.elasticClient = elasticClient;
this.version = getServerVersion();
} catch (final IOException e) {
throw new ConnectException(
@@ -103,30 +97,10 @@ public AivenElasticsearchClientWrapper(final RestHighLevelClient client) {
}
// visible for testing
- public AivenElasticsearchClientWrapper(final String address) {
+ public ElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig config) {
try {
- final Map props = new HashMap<>();
- props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, address);
- props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect");
- this.elasticClient = getElasticsearchClient(new ElasticsearchSinkConnectorConfig(props));
- this.version = getServerVersion();
- } catch (final IOException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to connection error:",
- e
- );
- } catch (final ConfigException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to configuration error:",
- e
- );
- }
- }
-
- // visible for testing
- public AivenElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig config) {
- try {
- this.elasticClient = getElasticsearchClient(config);
+ this.elasticTransport = getElasticsearchTransport(config);
+ this.elasticClient = getElasticsearchClient();
this.version = getServerVersion();
} catch (final IOException e) {
throw new ConnectException(
@@ -142,7 +116,7 @@ public AivenElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig co
}
private Version getServerVersion() throws IOException {
- final String esVersion = this.elasticClient.info(RequestOptions.DEFAULT).getVersion().getNumber();
+ final String esVersion = this.elasticClient.info().version().number();
return matchVersionString(esVersion);
}
@@ -168,12 +142,7 @@ private Version matchVersionString(final String esVersion) {
return defaultVersion;
}
- private boolean es8compat(final Version version) {
- return Objects.requireNonNull(version) == Version.ES_V8;
- }
-
- @SuppressWarnings("deprecation")
- private RestHighLevelClient getElasticsearchClient(final ElasticsearchSinkConnectorConfig config) {
+ private ElasticsearchTransport getElasticsearchTransport(final ElasticsearchSinkConnectorConfig config) {
final HttpHost[] httpHosts =
config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG).stream().map(HttpHost::create)
.toArray(HttpHost[]::new);
@@ -210,22 +179,15 @@ private RestHighLevelClient getElasticsearchClient(final ElasticsearchSinkConnec
});
final RestClient restClient = restClientBuilder.build();
- final Version version = resolveVersion(restClient);
- return new RestHighLevelClientBuilder(restClient)
- .setApiCompatibilityMode(es8compat(version))
- .build();
+ return new RestClientTransport(
+ restClient,
+ new JacksonJsonpMapper()
+ );
}
- @SuppressWarnings("deprecation")
- private Version resolveVersion(final RestClient restClient) {
- // No auto-closing, it would close also the restClient.
- final RestHighLevelClient highLevelClient = new RestHighLevelClientBuilder(restClient).build();
- try {
- final MainResponse.Version version = highLevelClient.info(RequestOptions.DEFAULT).getVersion();
- return matchVersionString(version.getNumber());
- } catch (final IOException e) {
- throw new ConnectException("Failed to get ElasticSearch version.", e);
- }
+ private co.elastic.clients.elasticsearch.ElasticsearchClient getElasticsearchClient() {
+ Objects.requireNonNull(this.elasticTransport);
+ return new co.elastic.clients.elasticsearch.ElasticsearchClient(this.elasticTransport);
}
public Version getVersion() {
@@ -233,9 +195,10 @@ public Version getVersion() {
}
private boolean indexExists(final String index) {
- final GetIndexRequest getIndexRequest = new GetIndexRequest(index);
+ final ExistsRequest existsRequest = new ExistsRequest.Builder()
+ .index(index).build();
try {
- return elasticClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
+ return elasticClient.indices().exists(existsRequest).value();
} catch (final IOException e) {
throw new ConnectException(e);
}
@@ -244,9 +207,10 @@ private boolean indexExists(final String index) {
public void createIndices(final Set indices) {
for (final String index : indices) {
if (!indexExists(index)) {
- final CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
+ final CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
+ .index(index).build();
try {
- elasticClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ elasticClient.indices().create(createIndexRequest);
} catch (final IOException e) {
throw new ConnectException("Could not create index '" + index + "'", e);
}
@@ -255,71 +219,88 @@ public void createIndices(final Set indices) {
}
public void createMapping(final String index, final String type, final Schema schema) throws IOException {
- final ObjectNode obj = JsonNodeFactory.instance.objectNode();
- obj.set(type, Mapping.inferMapping(getVersion(), schema));
- final JsonNode part = Mapping.inferMapping(getVersion(), schema);
- final PutMappingRequest request = new PutMappingRequest(index)
- .source(part.toString(), XContentType.JSON);
+ final ObjectNode root = JsonNodeFactory.instance.objectNode();
+ final JsonNode mapping = Mapping.inferMapping(getVersion(), schema);
+ final ObjectNode typeNode = JsonNodeFactory.instance.objectNode();
+ typeNode.set(type, mapping);
+ root.set("properties", typeNode);
+ final PutMappingRequest request = new PutMappingRequest.Builder()
+ .index(index)
+ .withJson(new StringReader(root.toString()))
+ .build();
+
try {
- this.elasticClient.indices().putMapping(request, RequestOptions.DEFAULT);
+ this.elasticClient.indices().putMapping(request);
} catch (final ElasticsearchException exception) {
throw new ConnectException(
- "Cannot create mapping " + schema + " -- " + exception.getDetailedMessage()
+ "Cannot create mapping " + schema + " -- " + exception.getMessage()
);
}
}
/**
- * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
+ * Get the mapping for given index and type. Returns {@code null} if it does not exist.
*/
@Override
- public MappingMetadata getMapping(final String index, final String type) throws IOException {
- final GetMappingsRequest request = new GetMappingsRequest()
- .indices(index);
- final GetMappingsResponse response = this.elasticClient.indices().getMapping(request, RequestOptions.DEFAULT);
- if (response.mappings().isEmpty()) {
+ public Property getMapping(final String index, final String type) throws IOException {
+ final GetMappingRequest request = new GetMappingRequest.Builder()
+ .index(index).build();
+ final GetMappingResponse response = this.elasticClient.indices().getMapping(request);
+ final IndexMappingRecord indexMappingRecord = response.get(index);
+ if (indexMappingRecord == null) {
return null;
}
- return response.mappings().get(index);
+ return indexMappingRecord.mappings().properties().get(type);
}
public BulkRequest createBulkRequest(final List batch) {
- final org.elasticsearch.action.bulk.BulkRequest bulkRequest = new org.elasticsearch.action.bulk.BulkRequest();
+ final List bulkOperations = new ArrayList<>();
for (final IndexableRecord record : batch) {
- bulkRequest.add(toBulkableAction(record));
+ bulkOperations.add(toBulkableOperation(record));
}
- return new BulkRequestImpl(bulkRequest);
+ final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest =
+ new co.elastic.clients.elasticsearch.core.BulkRequest.Builder()
+ .operations(bulkOperations)
+ .build();
+ return new ElasticsearchBulkRequest(bulkRequest);
}
// visible for testing
- protected DocWriteRequest> toBulkableAction(final IndexableRecord record) {
+ protected BulkOperation toBulkableOperation(final IndexableRecord record) {
// If payload is null, the record was a tombstone and we should delete from the index.
- return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record);
+ return record.payload != null ? toIndexOperation(record) : toDeleteOperation(record);
}
- private DeleteRequest toDeleteRequest(final IndexableRecord record) {
+ private BulkOperation toDeleteOperation(final IndexableRecord record) {
// TODO: Should version information be set here?
- return new DeleteRequest(record.key.index).id(record.key.id);
+ return new BulkOperation.Builder().delete(operation -> operation
+ .index(record.key.index)
+ .id(record.key.id)
+ ).build();
}
- private IndexRequest toIndexRequest(final IndexableRecord record) {
- final IndexRequest indexRequest = new IndexRequest(record.key.index)
- .id(record.key.id)
- .source(record.payload, XContentType.JSON);
- if (record.version != null) {
- indexRequest
- .versionType(VersionType.EXTERNAL)
- .version(record.version);
- }
- return indexRequest;
+ private BulkOperation toIndexOperation(final IndexableRecord record) {
+ final BinaryData binaryPayload =
+ BinaryData.of(record.payload.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
+ return new BulkOperation.Builder().index(operation -> {
+ operation
+ .index(record.key.index)
+ .id(record.key.id)
+ .document(binaryPayload);
+ if (record.version != null) {
+ operation
+ .versionType(VersionType.External)
+ .version(record.version);
+ }
+ return operation;
+ }).build();
}
public BulkResponse executeBulk(final BulkRequest bulk) throws IOException {
- final BulkRequestImpl bulkRequest = (BulkRequestImpl) bulk;
- final org.elasticsearch.action.bulk.BulkResponse
- response = elasticClient.bulk(bulkRequest.getBulkRequest(), RequestOptions.DEFAULT);
+ final co.elastic.clients.elasticsearch.core.BulkResponse response =
+ elasticClient.bulk(((ElasticsearchBulkRequest) bulk).getBulkRequest());
- if (!response.hasFailures()) {
+ if (!response.errors()) {
return BulkResponse.success();
}
@@ -328,17 +309,16 @@ public BulkResponse executeBulk(final BulkRequest bulk) throws IOException {
final List versionConflicts = new ArrayList<>();
final List errors = new ArrayList<>();
- for (final BulkItemResponse item : response.getItems()) {
- if (item.isFailed()) {
- final BulkItemResponse.Failure failure = item.getFailure();
- final String errorType = Optional.ofNullable(failure.getCause().getMessage()).orElse("");
- if (errorType.contains("version_conflict_engine_exception")) {
- versionConflicts.add(new Key(item.getIndex(), item.getType(), item.getId()));
- } else if (errorType.contains("mapper_parse_exception")) {
+ for (final BulkResponseItem item : response.items()) {
+ if (item.error() != null) {
+ final String errorType = item.error().type();
+ if ("version_conflict_engine_exception".equals(errorType)) {
+ versionConflicts.add(new Key(item.index(), item.operationType().name(), item.id()));
+ } else if ("mapper_parse_exception".equals(errorType)) {
retriable = false;
- errors.add(item.getFailureMessage());
+ errors.add(item.error().reason());
} else {
- errors.add(item.getFailureMessage());
+ errors.add(item.error().reason());
}
}
}
@@ -351,26 +331,33 @@ public BulkResponse executeBulk(final BulkRequest bulk) throws IOException {
}
}
- final String errorInfo = errors.isEmpty() ? response.buildFailureMessage() : errors.toString();
+ final String errorInfo = errors.isEmpty()
+ ? "Errors present, but error information missing."
+ : errors.toString();
+ LOG.trace("Bulk response: {}", response);
return BulkResponse.failure(retriable, errorInfo);
}
+ // visible for testing
@Override
- public SearchResponse search(final String index) throws IOException {
- final SearchRequest searchRequest = new SearchRequest();
+ public SearchResponse search(final String index) throws IOException {
+ final SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder();
+
if (index != null) {
- searchRequest.indices(index);
+ searchRequestBuilder.index(index);
}
- return elasticClient.search(searchRequest, RequestOptions.DEFAULT);
+ return elasticClient.search(searchRequestBuilder.build(), JsonNode.class);
}
- public void refresh(final String index) throws IOException {
- final RefreshRequest request = new RefreshRequest(index);
- elasticClient.indices().refresh(request, RequestOptions.DEFAULT);
+ public void refreshIndex(final String index) throws IOException {
+ final RefreshRequest request = new RefreshRequest.Builder()
+ .index(index)
+ .build();
+ elasticClient.indices().refresh(request);
}
public void close() throws IOException {
- elasticClient.close();
+ elasticTransport.close();
}
}
diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java
index af1d3e8..f6aa25e 100644
--- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java
+++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java
@@ -27,7 +27,7 @@
import org.apache.kafka.connect.sink.SinkRecord;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;
-import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;
+import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper;
import org.junit.Test;
import org.mockito.Mockito;
@@ -45,7 +45,7 @@ public void testRetryIfRecoverable() throws IOException {
final ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
final int numbRetriesBeforeSucceeding = 3;
- final AivenElasticsearchClientWrapper failingClient = Mockito.mock(AivenElasticsearchClientWrapper.class);
+ final ElasticsearchClientWrapper failingClient = Mockito.mock(ElasticsearchClientWrapper.class);
final AtomicInteger apiCallCounter = new AtomicInteger(0);
when(failingClient.executeBulk(any())).thenAnswer(i -> {
final int numAttempt = apiCallCounter.incrementAndGet();
@@ -76,7 +76,7 @@ public void testRetryIfRecoverable() throws IOException {
@Test
public void testRaiseExceptionIfNot() throws IOException {
final ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask();
- final AivenElasticsearchClientWrapper failingClient = Mockito.mock(AivenElasticsearchClientWrapper.class);
+ final ElasticsearchClientWrapper failingClient = Mockito.mock(ElasticsearchClientWrapper.class);
final AtomicInteger apiCallCounter = new AtomicInteger(0);
when(failingClient.executeBulk(any())).thenAnswer(i -> {
apiCallCounter.incrementAndGet();
diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
index d69446d..1e7ed03 100644
--- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -29,10 +30,11 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
-import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;
+import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.search.SearchHit;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import com.fasterxml.jackson.databind.JsonNode;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -44,6 +46,7 @@
public class ElasticsearchSinkTestBase {
+ private static final String DEFAULT_ELASTICSEARCH_TEST_CONTAINER_VERSION = "8.13.0";
private static final String ELASTICSEARCH_PASSWORD = "disable_tls_for_testing";
protected static final String TYPE = "kafka-connect";
@@ -64,14 +67,30 @@ public ElasticsearchSinkTestBase() {
@BeforeClass
public static void staticSetUp() {
- container = new ElasticsearchContainer("elasticsearch:8.12.2")
+ final String elasticsearchContainerVersion = System.getenv().getOrDefault(
+ "ELASTIC_TEST_CONTAINER_VERSION",
+ DEFAULT_ELASTICSEARCH_TEST_CONTAINER_VERSION
+ );
+ container = new ElasticsearchContainer("elasticsearch:" + elasticsearchContainerVersion)
.withPassword(ELASTICSEARCH_PASSWORD);
container.getEnvMap().put("xpack.security.transport.ssl.enabled", "false");
container.getEnvMap().put("xpack.security.http.ssl.enabled", "false");
- container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*"));
+ container.setWaitStrategy(new LogMessageWaitStrategy()
+ .withRegEx(getLogMessageWaitStrategyRegex(elasticsearchContainerVersion)));
container.start();
}
+ private static String getLogMessageWaitStrategyRegex(final String elasticContainerVersion) {
+ final char majorVersion = elasticContainerVersion.charAt(0);
+ switch (majorVersion) {
+ case '7':
+ return ".*\"message\": \"started.*";
+ default:
+ // Default to major version 8 log message
+ return ".*\"message\":\"started.*";
+ }
+ }
+
@Before
public void setUp() throws Exception {
final Map props = new HashMap<>();
@@ -80,7 +99,7 @@ public void setUp() throws Exception {
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG, "elastic");
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect");
final ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
- client = new AivenElasticsearchClientWrapper(config);
+ client = new ElasticsearchClientWrapper(config);
converter = new DataConverter(true, DataConverter.BehaviorOnNullValues.IGNORE);
}
@@ -146,15 +165,15 @@ protected void verifySearchResults(
final String index,
final boolean ignoreKey,
final boolean ignoreSchema) throws IOException {
- final SearchResponse result = client.search(index);
- final SearchHit[] rawHits = result.getHits().getHits();
+ final SearchResponse result = client.search(index);
+ final List> rawHits = result.hits().hits();
- assertEquals(records.size(), rawHits.length);
+ assertEquals(records.size(), rawHits.size());
final Map hits = new HashMap<>();
- for (int i = 0; i < rawHits.length; ++i) {
- final String id = rawHits[i].getId();
- final String source = rawHits[i].getSourceAsString();
+ for (final Hit hit: rawHits) {
+ final String id = hit.id();
+ final String source = hit.source().toString();
hits.put(id, source);
}
diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java
index b0bfefe..3b7cd90 100644
--- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java
+++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java
@@ -352,7 +352,7 @@ public void testDeleteOnNullValue() throws Exception {
// Can't call writeDataAndRefresh(writer, records) since it stops the writer
writer.write(records);
writer.flush();
- client.refresh(getTopic());
+ client.refreshIndex(getTopic());
// Make sure the record made it there successfully
verifySearchResults(records);
@@ -534,7 +534,7 @@ private void writeDataAndRefresh(
writer.write(records);
writer.flush();
writer.stop();
- client.refresh(index);
+ client.refreshIndex(index);
}
private void verifySearchResults(final Collection records) throws Exception {
diff --git a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java
index 63f0437..283aa1d 100644
--- a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java
+++ b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java
@@ -18,7 +18,6 @@
package io.aiven.connect.elasticsearch;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Date;
@@ -29,16 +28,12 @@
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
-import com.fasterxml.jackson.databind.node.NumericNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.TextNode;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertTrue;
public class MappingTest extends ElasticsearchSinkTestBase {
@@ -46,7 +41,6 @@ public class MappingTest extends ElasticsearchSinkTestBase {
private static final String TYPE = "kafka-connect-type";
@Test
- @SuppressWarnings("unchecked")
public void testMapping() throws Exception {
final Set indices = new HashSet<>();
indices.add(INDEX);
@@ -55,32 +49,10 @@ public void testMapping() throws Exception {
final Schema schema = createSchema();
Mapping.createMapping(client, INDEX, TYPE, schema);
- final MappingMetadata mapping = Mapping.getMapping(client, INDEX, TYPE);
+ final Property mapping = Mapping.getMapping(client, INDEX, TYPE);
assertNotNull(mapping);
- verifyMapping(schema, mapping.sourceAsMap());
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testStringMappingForES6() throws Exception {
- final ElasticsearchClient client = mock(ElasticsearchClient.class);
- when(client.getVersion()).thenReturn(ElasticsearchClient.Version.ES_V6);
-
- final Schema schema = SchemaBuilder.struct().name("textRecord")
- .field("string", Schema.STRING_SCHEMA)
- .build();
- final ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client.getVersion(), schema);
- final ObjectNode properties = mapping.with("properties");
- final ObjectNode string = properties.with("string");
- final TextNode stringType = (TextNode) string.get("type");
- final ObjectNode fields = string.with("fields");
- final ObjectNode keyword = fields.with("keyword");
- final TextNode keywordType = (TextNode) keyword.get("type");
- final NumericNode ignoreAbove = (NumericNode) keyword.get("ignore_above");
-
- assertEquals(ElasticsearchSinkConnectorConstants.TEXT_TYPE, stringType.asText());
- assertEquals(ElasticsearchSinkConnectorConstants.KEYWORD_TYPE, keywordType.asText());
- assertEquals(256, ignoreAbove.asInt());
+ assertTrue(mapping.isObject());
+ verifyMapping(schema, mapping);
}
protected Schema createSchema() {
@@ -125,19 +97,17 @@ private Schema createInnerSchema() {
.build();
}
- @SuppressWarnings("unchecked")
- private void verifyMapping(final Schema schema, final Map mapping) {
+ private void verifyMapping(final Schema schema, final Property property) {
final String schemaName = schema.name();
- final Object type = mapping.get("type");
if (schemaName != null) {
switch (schemaName) {
case Date.LOGICAL_NAME:
case Time.LOGICAL_NAME:
case Timestamp.LOGICAL_NAME:
- assertEquals(ElasticsearchSinkConnectorConstants.DATE_TYPE, type.toString());
+ assertEquals(ElasticsearchSinkConnectorConstants.DATE_TYPE, property._kind().jsonValue());
return;
case Decimal.LOGICAL_NAME:
- assertEquals(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, type.toString());
+ assertEquals(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, property._kind().jsonValue());
return;
default:
}
@@ -148,30 +118,29 @@ private void verifyMapping(final Schema schema, final Map mappin
final Schema.Type schemaType = schema.type();
switch (schemaType) {
case ARRAY:
- verifyMapping(schema.valueSchema(), mapping);
+ verifyMapping(schema.valueSchema(), property);
break;
case MAP:
final Schema newSchema = converter.preProcessSchema(schema);
- final Map> mapMapping =
- (Map>) mapping.get("properties");
verifyMapping(
newSchema.keySchema(),
- mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_KEY)
+ property.object().properties().get(ElasticsearchSinkConnectorConstants.MAP_KEY)
);
verifyMapping(
newSchema.valueSchema(),
- mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_VALUE)
+ property.object().properties().get(ElasticsearchSinkConnectorConstants.MAP_VALUE)
);
break;
case STRUCT:
- final Map> structMapping =
- (Map>) mapping.get("properties");
for (final Field field : schema.fields()) {
- verifyMapping(field.schema(), structMapping.get(field.name()));
+ verifyMapping(field.schema(), property.object().properties().get(field.name()));
}
break;
default:
- assertEquals(Mapping.getElasticsearchType(client.getVersion(), schemaType), type.toString());
+ assertEquals(
+ Mapping.getElasticsearchType(client.getVersion(), schemaType),
+ property._kind().jsonValue()
+ );
}
}
}
diff --git a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java
deleted file mode 100644
index a0ed1c5..0000000
--- a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Copyright 2020 Aiven Oy
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.connect.elasticsearch.clientwrapper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.errors.ConnectException;
-
-import io.aiven.connect.elasticsearch.ElasticsearchClient;
-import io.aiven.connect.elasticsearch.IndexableRecord;
-import io.aiven.connect.elasticsearch.Key;
-import io.aiven.connect.elasticsearch.Mapping;
-import io.aiven.connect.elasticsearch.bulk.BulkRequest;
-
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Sets;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchResponseSections;
-import org.elasticsearch.client.IndicesClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.core.MainResponse;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetIndexRequest;
-import org.elasticsearch.client.indices.GetMappingsRequest;
-import org.elasticsearch.client.indices.GetMappingsResponse;
-import org.elasticsearch.client.indices.PutMappingRequest;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.search.SearchHits;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.InOrder;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class AivenElasticsearchClientWrapperTest {
-
- private static final String INDEX = "index";
- private static final String KEY = "key";
- private static final String TYPE = "type";
-
- private RestHighLevelClient elasticsearchClient;
-
- @Before
- public void setUp() throws Exception {
- elasticsearchClient = mock(RestHighLevelClient.class);
- when(elasticsearchClient.info(any())).thenReturn(
- new MainResponse("localhost",
- new MainResponse.Version("1.0", "buildFlavor",
- "buildType", "buildHash", "buildDate", false,
- "luceneVersion",
- "minWireCompVersion", "minIndexCompVersion"),
- "clusterName", UUID.randomUUID().toString(), "tagLine"
- )
- );
- }
-
- @Test
- public void getsVersion() {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- assertThat(client.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1)));
- }
-
- @Test
- public void createsIndices() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final IndicesClient indicesClient = mock(IndicesClient.class);
- when(elasticsearchClient.indices()).thenReturn(indicesClient);
- when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false);
- when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure"));
-
- when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false);
- when(indicesClient.create(argThat(isCreateIndexForTestIndex()), any())).thenReturn(
- new CreateIndexResponse(true, true, INDEX));
-
- final Set indices = Sets.newHashSet();
- indices.add(INDEX);
- client.createIndices(indices);
- final InOrder inOrder = inOrder(elasticsearchClient, indicesClient);
- inOrder.verify(elasticsearchClient).info(any());
- inOrder.verify(indicesClient).exists(any(GetIndexRequest.class), any());
- inOrder.verify(indicesClient).create(argThat(isCreateIndexForTestIndex()), any());
- }
-
- private ArgumentMatcher isCreateIndexForTestIndex() {
- return new ArgumentMatcher() {
- @Override
- public boolean matches(final CreateIndexRequest createIndexRequest) {
- // check the URI as the equals method on CreateIndex doesn't work
- return createIndexRequest.index().equals(INDEX);
- }
- };
- }
-
- @Test
- public void createIndicesAndFails() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final IndicesClient indicesClient = mock(IndicesClient.class);
- when(elasticsearchClient.indices()).thenReturn(indicesClient);
- when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false);
- when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure"));
-
- final Set indices = new HashSet<>();
- indices.add("test-index");
- assertThrows("Could not create index 'test-index'", ConnectException.class, () -> {
- client.createIndices(indices);
- });
- }
-
- @Test
- public void createsMapping() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final IndicesClient indicesClient = mock(IndicesClient.class);
- when(elasticsearchClient.indices()).thenReturn(indicesClient);
- final ObjectNode obj = JsonNodeFactory.instance.objectNode();
- obj.set(TYPE, Mapping.inferMapping(client.getVersion(), Schema.STRING_SCHEMA));
- client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA);
- verify(indicesClient).putMapping(any(PutMappingRequest.class), any());
- }
-
- @Test(expected = ConnectException.class)
- public void createsMappingAndFails() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final IndicesClient indicesClient = mock(IndicesClient.class);
- when(elasticsearchClient.indices()).thenReturn(indicesClient);
- when(indicesClient.putMapping(any(PutMappingRequest.class), any()))
- .thenThrow(new ElasticsearchException("failure"));
- client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA);
- }
-
- @Test
- public void getsMapping() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final IndicesClient indicesClient = mock(IndicesClient.class);
- when(elasticsearchClient.indices()).thenReturn(indicesClient);
- final MappingMetadata mappingMetadata = new MappingMetadata(TYPE, new HashMap<>());
- final Map mappingsMetadata = new HashMap<>();
- mappingsMetadata.put(INDEX, mappingMetadata);
- when(indicesClient.getMapping(any(GetMappingsRequest.class), any())).thenReturn(
- new GetMappingsResponse(mappingsMetadata));
- assertEquals(client.getMapping(INDEX, TYPE), new MappingMetadata(TYPE, new HashMap<>()));
- }
-
- @Test
- public void executesBulk() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
- BulkItemResponse.success(1, DocWriteRequest.OpType.CREATE,
- new IndexResponse(ShardId.fromString("[" + INDEX + "][1]"), TYPE, "id", 1L, 1L, 1L, true)
- )
- };
- when(elasticsearchClient.bulk(any(), any())).thenReturn(
- new BulkResponse(bulkItemResponses, 100)
- );
-
- final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
- final List records = new ArrayList<>();
- records.add(record);
- final BulkRequest request = client.createBulkRequest(records);
- assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(true)));
- }
-
- @Test
- public void executesBulkAndFails() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
- BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure(
- INDEX, TYPE, "id", new IOException("failure")
- ))
- };
- when(elasticsearchClient.bulk(any(), any())).thenReturn(
- new BulkResponse(bulkItemResponses, 100)
- );
-
- final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L);
- final List records = new ArrayList<>();
- records.add(record);
- final BulkRequest request = client.createBulkRequest(records);
- final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request);
- assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
- assertThat(bulkResponse.isRetriable(), is(equalTo(true)));
- assertEquals("[java.io.IOException: failure]", bulkResponse.getErrorInfo());
- }
-
- @Test
- public void executesBulkAndFailsWithParseError() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
- BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure(
- INDEX, AivenElasticsearchClientWrapper.MAPPER_PARSE_EXCEPTION, "id",
- new ElasticsearchException("[type=mapper_parse_exception, reason=[key]: Mapper parse error]")
-
- ))
- };
- when(elasticsearchClient.bulk(any(), any())).thenReturn(
- new BulkResponse(bulkItemResponses, 100)
- );
-
- final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
- final List records = new ArrayList<>();
- records.add(record);
- final BulkRequest request = client.createBulkRequest(records);
- final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request);
- assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
- assertThat(bulkResponse.isRetriable(), is(equalTo(false)));
- assertEquals(
- "[ElasticsearchException[[type=mapper_parse_exception, reason=[key]: Mapper parse error]]]",
- bulkResponse.getErrorInfo()
- );
- }
-
- @Test
- public void executesBulkAndFailsWithSomeOtherError() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
- BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure(
- INDEX, "some_random_exception", "id",
- new ElasticsearchException("[type=random_type_string, reason=[key]: Unknown error]")
- ))
- };
- when(elasticsearchClient.bulk(any(), any())).thenReturn(
- new BulkResponse(bulkItemResponses, 100)
- );
-
- final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
- final List records = new ArrayList<>();
- records.add(record);
- final BulkRequest request = client.createBulkRequest(records);
- final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request);
- assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
- assertThat(bulkResponse.isRetriable(), is(equalTo(true)));
- assertEquals(
- "[ElasticsearchException[[type=random_type_string, reason=[key]: Unknown error]]]",
- bulkResponse.getErrorInfo()
- );
- }
-
- @Test
- public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
- BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure(
- INDEX, "_doc", "id",
- new ElasticsearchException(
- "[type=version_conflict_engine_exception, reason=[key]: "
- + "version conflict, current version [1] is higher or equal to the one provided [0]]")
- ))
- };
- when(elasticsearchClient.bulk(any(), any())).thenReturn(
- new BulkResponse(bulkItemResponses, 100)
- );
-
-
- final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
- final List records = new ArrayList<>();
- records.add(record);
- final BulkRequest request = client.createBulkRequest(records);
- final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request);
- assertThat(bulkResponse.isSucceeded(), is(equalTo(true)));
- }
-
- @Test
- public void searches() throws Exception {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- final SearchResponse response = new SearchResponse(
- new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 0),
- "scrollId", 1, 1, 0, 100L, null, SearchResponse.Clusters.EMPTY
- );
- when(elasticsearchClient.search(any(SearchRequest.class), any())).thenReturn(response);
- assertNotNull(client.search(INDEX));
- verify(elasticsearchClient).search(any(SearchRequest.class), any());
- }
-
- @Test
- public void closes() throws IOException {
- final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient);
- client.close();
- verify(elasticsearchClient).close();
- }
-}
diff --git a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java
new file mode 100644
index 0000000..e1c4dd7
--- /dev/null
+++ b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java
@@ -0,0 +1,411 @@
+/*
+ * Copyright 2020 Aiven Oy
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.elasticsearch.clientwrapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import io.aiven.connect.elasticsearch.ElasticsearchClient;
+import io.aiven.connect.elasticsearch.IndexableRecord;
+import io.aiven.connect.elasticsearch.Key;
+import io.aiven.connect.elasticsearch.Mapping;
+import io.aiven.connect.elasticsearch.bulk.BulkRequest;
+
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
+import co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo;
+import co.elastic.clients.elasticsearch._types.ErrorCause;
+import co.elastic.clients.elasticsearch._types.ErrorResponse;
+import co.elastic.clients.elasticsearch._types.ShardStatistics;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.InfoResponse;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
+import co.elastic.clients.elasticsearch.core.bulk.OperationType;
+import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
+import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.elasticsearch.indices.GetMappingRequest;
+import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
+import co.elastic.clients.elasticsearch.indices.PutMappingRequest;
+import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchClientWrapperTest {
+
+ private static final String INDEX = "index";
+ private static final String KEY = "key";
+ private static final String TYPE = "type";
+
+ private ElasticsearchTransport elasticsearchTransport;
+ private co.elastic.clients.elasticsearch.ElasticsearchClient elasticsearchClient;
+ private ElasticsearchClientWrapper elasticsearchClientWrapper;
+
+ @Before
+ public void setUp() throws Exception {
+ elasticsearchTransport = mock(ElasticsearchTransport.class);
+ elasticsearchClient = mock(co.elastic.clients.elasticsearch.ElasticsearchClient.class);
+ when(elasticsearchClient.info()).thenReturn(
+ new InfoResponse.Builder()
+ .version(new ElasticsearchVersionInfo.Builder()
+ .number("1.0")
+ .buildFlavor("buildFlavor")
+ .buildType("buildType")
+ .buildHash("buildHash")
+ .buildDate("buildDate")
+ .buildSnapshot(false)
+ .luceneVersion("luceneVersion")
+ .minimumWireCompatibilityVersion("minWireCompVersion")
+ .minimumIndexCompatibilityVersion("minIndexCompVersion")
+ .build()
+ )
+ .clusterName("clusterName")
+ .clusterUuid(UUID.randomUUID().toString())
+ .tagline("tagLine")
+ .name("name")
+ .build()
+ );
+ // InfoResponse required for resolving server version when initializing the client
+ elasticsearchClientWrapper = new ElasticsearchClientWrapper(elasticsearchTransport, elasticsearchClient);
+ }
+
+ @Test
+ public void getsVersion() {
+ assertThat(elasticsearchClientWrapper.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1)));
+ }
+
+ @Test
+ public void createsIndices() throws Exception {
+ final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class);
+ when(elasticsearchClient.indices()).thenReturn(indicesClient);
+ when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false));
+ when(indicesClient.create(any(CreateIndexRequest.class))).thenThrow(new IOException("failure"));
+
+ when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false));
+ when(indicesClient.create(argThat(isCreateIndexForTestIndex()))).thenReturn(
+ new CreateIndexResponse.Builder()
+ .index(INDEX)
+ .acknowledged(true)
+ .shardsAcknowledged(true)
+ .build()
+ );
+
+ final Set indices = new HashSet<>();
+ indices.add(INDEX);
+ elasticsearchClientWrapper.createIndices(indices);
+ final InOrder inOrder = inOrder(elasticsearchClient, indicesClient);
+ inOrder.verify(elasticsearchClient).info();
+ inOrder.verify(indicesClient).exists(any(ExistsRequest.class));
+ inOrder.verify(indicesClient).create(argThat(isCreateIndexForTestIndex()));
+ }
+
+ private ArgumentMatcher isCreateIndexForTestIndex() {
+ return new ArgumentMatcher() {
+ @Override
+ public boolean matches(final CreateIndexRequest createIndexRequest) {
+ // check the URI as the equals method on CreateIndex doesn't work
+ return createIndexRequest.index().equals(INDEX);
+ }
+ };
+ }
+
+ @Test
+ public void createIndicesAndFails() throws Exception {
+ final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class);
+ when(elasticsearchClient.indices()).thenReturn(indicesClient);
+ when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false));
+ when(indicesClient.create(any(CreateIndexRequest.class))).thenThrow(new IOException("failure"));
+
+ final Set indices = new HashSet<>();
+ indices.add("test-index");
+ assertThrows("Could not create index 'test-index'", ConnectException.class, () -> {
+ elasticsearchClientWrapper.createIndices(indices);
+ });
+ }
+
+ @Test
+ public void createsMapping() throws Exception {
+ final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class);
+ when(elasticsearchClient.indices()).thenReturn(indicesClient);
+ final ObjectNode obj = JsonNodeFactory.instance.objectNode();
+ obj.set(TYPE, Mapping.inferMapping(elasticsearchClientWrapper.getVersion(), Schema.STRING_SCHEMA));
+ elasticsearchClientWrapper.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA);
+ verify(indicesClient).putMapping(any(PutMappingRequest.class));
+ }
+
+ @Test(expected = ConnectException.class)
+ public void createsMappingAndFails() throws Exception {
+ final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class);
+ when(elasticsearchClient.indices()).thenReturn(indicesClient);
+ when(indicesClient.putMapping(any(PutMappingRequest.class)))
+ .thenThrow(
+ new ElasticsearchException("endpointId", new ErrorResponse.Builder()
+ .error(new ErrorCause.Builder()
+ .reason("failure")
+ .build())
+ .status(500)
+ .build())
+ );
+ elasticsearchClientWrapper.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA);
+ }
+
+ @Test
+ public void getsMapping() throws Exception {
+ final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class);
+ when(elasticsearchClient.indices()).thenReturn(indicesClient);
+ final IndexMappingRecord indexMappingRecord = new IndexMappingRecord.Builder()
+ .mappings(new TypeMapping.Builder()
+ .properties(TYPE, builder -> {
+ return builder.text(new TextProperty.Builder().build());
+ }).build())
+ .build();
+ final Map indexMappingRecordMap = new HashMap<>();
+ indexMappingRecordMap.put(INDEX, indexMappingRecord);
+ when(indicesClient.getMapping(any(GetMappingRequest.class))).thenReturn(
+ new GetMappingResponse.Builder()
+ .result(indexMappingRecordMap)
+ .build()
+ );
+
+ assertEquals(
+ new Property.Builder().text(new TextProperty.Builder().build()).build()._kind().jsonValue(),
+ elasticsearchClientWrapper.getMapping(INDEX, TYPE)._kind().jsonValue()
+ );
+ }
+
+ @Test
+ public void executesBulk() throws Exception {
+ final List items = new ArrayList<>();
+ items.add(new BulkResponseItem.Builder()
+ .operationType(OperationType.Create)
+ .index(INDEX)
+ .status(200)
+ .build()
+ );
+ when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn(
+ new BulkResponse.Builder()
+ .items(items)
+ .errors(false)
+ .took(100)
+ .build()
+ );
+
+ final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
+ final List records = new ArrayList<>();
+ records.add(record);
+ final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records);
+ assertThat(elasticsearchClientWrapper.executeBulk(request).isSucceeded(), is(equalTo(true)));
+ }
+
+ @Test
+ public void executesBulkAndFails() throws Exception {
+ final List items = new ArrayList<>();
+ items.add(new BulkResponseItem.Builder()
+ .operationType(OperationType.Create)
+ .index(INDEX)
+ .status(200)
+ .error(builder -> builder
+ .type("failure")
+ .reason("failure")
+ )
+ .build()
+ );
+ when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn(
+ new BulkResponse.Builder()
+ .items(items)
+ .errors(true)
+ .took(100)
+ .build()
+ );
+
+ final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L);
+ final List records = new ArrayList<>();
+ records.add(record);
+ final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records);
+ final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse =
+ elasticsearchClientWrapper.executeBulk(request);
+ assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
+ assertThat(bulkResponse.isRetriable(), is(equalTo(true)));
+ assertEquals("[failure]", bulkResponse.getErrorInfo());
+ }
+
+ @Test
+ public void executesBulkAndFailsWithParseError() throws Exception {
+ final List items = new ArrayList<>();
+ items.add(new BulkResponseItem.Builder()
+ .operationType(OperationType.Create)
+ .index(INDEX)
+ .status(200)
+ .error(builder -> builder
+ .type("mapper_parse_exception")
+ .reason("[type=mapper_parse_exception, reason=[key]: Mapper parse error]")
+ )
+ .build()
+ );
+ when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn(
+ new BulkResponse.Builder()
+ .items(items)
+ .errors(true)
+ .took(100)
+ .build()
+ );
+
+ final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
+ final List records = new ArrayList<>();
+ records.add(record);
+ final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records);
+ final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse =
+ elasticsearchClientWrapper.executeBulk(request);
+ assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
+ assertThat(bulkResponse.isRetriable(), is(equalTo(false)));
+ assertEquals(
+ "[[type=mapper_parse_exception, reason=[key]: Mapper parse error]]",
+ bulkResponse.getErrorInfo()
+ );
+ }
+
+ @Test
+ public void executesBulkAndFailsWithSomeOtherError() throws Exception {
+ final List items = new ArrayList<>();
+ items.add(new BulkResponseItem.Builder()
+ .operationType(OperationType.Create)
+ .index(INDEX)
+ .status(200)
+ .error(builder -> builder
+ .type("random_error_type_string")
+ .reason("[type=random_type_string, reason=[key]: Unknown error]")
+ )
+ .build()
+ );
+ when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn(
+ new BulkResponse.Builder()
+ .items(items)
+ .errors(true)
+ .took(100)
+ .build()
+ );
+
+ final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
+ final List records = new ArrayList<>();
+ records.add(record);
+ final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records);
+ final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse =
+ elasticsearchClientWrapper.executeBulk(request);
+ assertThat(bulkResponse.isSucceeded(), is(equalTo(false)));
+ assertThat(bulkResponse.isRetriable(), is(equalTo(true)));
+ assertEquals(
+ "[[type=random_type_string, reason=[key]: Unknown error]]",
+ bulkResponse.getErrorInfo()
+ );
+ }
+
+ @Test
+ public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception {
+ final List items = new ArrayList<>();
+ items.add(new BulkResponseItem.Builder()
+ .operationType(OperationType.Create)
+ .index(INDEX)
+ .status(200)
+ .error(builder -> builder
+ .type("version_conflict_engine_exception")
+ .reason("[type=version_conflict_engine_exception, reason=[key]: "
+ + "version conflict, current version [1] is higher or equal to the one provided [0]]")
+ )
+ .build()
+ );
+ when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn(
+ new BulkResponse.Builder()
+ .items(items)
+ .errors(true)
+ .took(100)
+ .build()
+ );
+
+
+ final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L);
+ final List records = new ArrayList<>();
+ records.add(record);
+ final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records);
+ final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse =
+ elasticsearchClientWrapper.executeBulk(request);
+ assertThat(bulkResponse.isSucceeded(), is(equalTo(true)));
+ }
+
+ @Test
+ public void searches() throws Exception {
+ final SearchResponse response = new SearchResponse.Builder()
+ .took(100)
+ .timedOut(false)
+ .hits(
+ new HitsMetadata.Builder()
+ .hits(new ArrayList<>())
+ .build()
+ )
+ .shards(
+ new ShardStatistics.Builder()
+ .failed(1)
+ .successful(1)
+ .total(2)
+ .build()
+ ).build();
+ when(elasticsearchClient.search(any(SearchRequest.class), any())).thenReturn(response);
+ assertNotNull(elasticsearchClientWrapper.search(INDEX));
+ verify(elasticsearchClient).search(any(SearchRequest.class), any());
+ }
+
+ @Test
+ public void closes() throws IOException {
+ elasticsearchClientWrapper.close();
+ verify(elasticsearchTransport).close();
+ }
+}