diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 0d932c1..3627b82 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -13,12 +13,13 @@ jobs: runs-on: ubuntu-latest steps: - name: Setup Java SDK - uses: actions/setup-java@v1.4.3 + uses: actions/setup-java@4 with: + distribution: "temurin" java-version: 11 - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: ref: ${{ github.event.inputs.commit_hash }} diff --git a/.github/workflows/main_push_workflow.yml b/.github/workflows/main_push_workflow.yml index 64cb7b0..435bc69 100644 --- a/.github/workflows/main_push_workflow.yml +++ b/.github/workflows/main_push_workflow.yml @@ -9,12 +9,13 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up JDK 11 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: + distribution: "temurin" java-version: 11 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew check diff --git a/.github/workflows/pull_request_workflow.yml b/.github/workflows/pull_request_workflow.yml index 5b1fc38..8c18d5e 100644 --- a/.github/workflows/pull_request_workflow.yml +++ b/.github/workflows/pull_request_workflow.yml @@ -10,10 +10,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up JDK 11 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: + distribution: "temurin" java-version: 11 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew check --info diff --git a/.github/workflows/release_pr_workflow.yml b/.github/workflows/release_pr_workflow.yml index 49958b6..cba5753 100644 --- a/.github/workflows/release_pr_workflow.yml +++ b/.github/workflows/release_pr_workflow.yml @@ -30,7 +30,7 @@ jobs: fi - name: Checkout main - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: ref: main fetch-depth: 0 diff --git a/build.gradle b/build.gradle index e90d6fb..daf7b66 100644 --- a/build.gradle +++ b/build.gradle @@ -123,11 +123,14 @@ publishing { } ext { + guavaVersion = "11.0.2" kafkaVersion = "2.2.0" - slf4jVersion = "1.7.36" - elasticSearchVersion = "2.4.1" - luceneVersion = "5.5.2" - jestVersion = "6.3.1" + slf4jVersion = "2.0.12" + log4jVersion = "2.23.0" + elasticSearchVersion = "7.4.0" + elasticClientVersion = "7.17.0" + testContainersElasticVersion = "1.19.6" + carrotsearchVersion = "2.8.1" } processResources { @@ -141,7 +144,11 @@ dependencies { compileOnly "org.apache.kafka:connect-json:$kafkaVersion" implementation "org.slf4j:slf4j-api:$slf4jVersion" - implementation "io.searchbox:jest:$jestVersion" + implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion" + implementation("com.google.guava:guava:$guavaVersion") + + implementation "org.apache.logging.log4j:log4j-api:$log4jVersion" + implementation "org.apache.logging.log4j:log4j-core:$log4jVersion" testImplementation("junit:junit:4.13.2") { exclude group: 'org.hamcrest', module: 'hamcrest-core' @@ -152,13 +159,8 @@ dependencies { testImplementation "org.apache.kafka:connect-api:$kafkaVersion" testImplementation "org.apache.kafka:connect-json:$kafkaVersion" - testImplementation "org.apache.lucene:lucene-test-framework:$luceneVersion" - testImplementation "com.fasterxml.jackson.core:jackson-core:2.15.2" - testImplementation "com.fasterxml.jackson.core:jackson-databind:2.15.2" - testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.15.2" - testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion:tests" - testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion" - testImplementation "org.apache.lucene:lucene-expressions:$luceneVersion" + testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion" + implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion") testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index b9d8886..6c63d13 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -25,7 +25,7 @@ /> diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java index 0cfa06b..cfe1152 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java @@ -26,12 +26,13 @@ import io.aiven.connect.elasticsearch.bulk.BulkRequest; import io.aiven.connect.elasticsearch.bulk.BulkResponse; -import com.google.gson.JsonObject; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.MappingMetadata; public interface ElasticsearchClient extends AutoCloseable { enum Version { - ES_V1, ES_V2, ES_V5, ES_V6, ES_V7 + ES_V1, ES_V2, ES_V5, ES_V6, ES_V7, ES_V8 } /** @@ -65,7 +66,7 @@ enum Version { * @param type the type * @throws IOException if the client cannot execute the request */ - JsonObject getMapping(String index, String type) throws IOException; + MappingMetadata getMapping(String index, String type) throws IOException; /** * Creates a bulk request for the list of {@link IndexableRecord} records. @@ -87,13 +88,18 @@ enum Version { /** * Executes a search. * - * @param query the search query * @param index the index to search - * @param type the type to search * @return the search result * @throws IOException if the client cannot execute the request */ - JsonObject search(String query, String index, String type) throws IOException; + SearchResponse search(String index) throws IOException; + + /** + * Executes a search. + * + * @param index the index to refresh + */ + void refresh(String index) throws IOException; /** * Shuts down the client. diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java index 659cc73..2b7c811 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java @@ -34,7 +34,7 @@ import org.apache.kafka.connect.sink.SinkTask; import io.aiven.connect.elasticsearch.bulk.BulkProcessor; -import io.aiven.connect.elasticsearch.jest.JestElasticsearchClient; +import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +121,7 @@ public void start(final Map props, final ElasticsearchClient cli if (client != null) { this.client = client; } else { - this.client = new JestElasticsearchClient(props); + this.client = new AivenElasticsearchClientWrapper(config); } final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client) @@ -178,6 +178,10 @@ public void close(final Collection partitions) { log.debug("Closing the task for topic partitions: {}", partitions); } + public void refresh(final String index) throws IOException { + client.refresh(index); + } + @Override public void stop() throws ConnectException { log.info("Stopping ElasticsearchSinkTask."); diff --git a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java index 21ab9cc..c07ecba 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java +++ b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java @@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.gson.JsonObject; +import org.elasticsearch.cluster.metadata.MappingMetadata; public class Mapping { @@ -57,7 +57,7 @@ public static void createMapping( /** * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist. */ - public static JsonObject getMapping(final ElasticsearchClient client, final String index, final String type) + public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type) throws IOException { return client.getMapping(index, type); } @@ -67,7 +67,7 @@ public static JsonObject getMapping(final ElasticsearchClient client, final Stri * * @param schema The schema used to infer mapping. */ - public static JsonNode inferMapping(final ElasticsearchClient client, final Schema schema) { + public static JsonNode inferMapping(final ElasticsearchClient.Version version, final Schema schema) { if (schema == null) { throw new DataException("Cannot infer mapping without schema."); } @@ -83,26 +83,26 @@ public static JsonNode inferMapping(final ElasticsearchClient client, final Sche final ObjectNode fields = JsonNodeFactory.instance.objectNode(); switch (schemaType) { case ARRAY: - return inferMapping(client, schema.valueSchema()); + return inferMapping(version, schema.valueSchema()); case MAP: properties.set("properties", fields); - fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(client, schema.keySchema())); - fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(client, schema.valueSchema())); + fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(version, schema.keySchema())); + fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(version, schema.valueSchema())); return properties; case STRUCT: properties.set("properties", fields); for (final Field field : schema.fields()) { - fields.set(field.name(), inferMapping(client, field.schema())); + fields.set(field.name(), inferMapping(version, field.schema())); } return properties; default: - final String esType = getElasticsearchType(client, schemaType); + final String esType = getElasticsearchType(version, schemaType); return inferPrimitive(esType, schema.defaultValue()); } } // visible for testing - protected static String getElasticsearchType(final ElasticsearchClient client, + protected static String getElasticsearchType(final ElasticsearchClient.Version version, final Schema.Type schemaType) { switch (schemaType) { case BOOLEAN: @@ -120,7 +120,7 @@ protected static String getElasticsearchType(final ElasticsearchClient client, case FLOAT64: return ElasticsearchSinkConnectorConstants.DOUBLE_TYPE; case STRING: - switch (client.getVersion()) { + switch (version) { case ES_V1: case ES_V2: return ElasticsearchSinkConnectorConstants.STRING_TYPE; diff --git a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java new file mode 100644 index 0000000..f7e7b42 --- /dev/null +++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java @@ -0,0 +1,363 @@ +/* + * Copyright 2020 Aiven Oy + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.elasticsearch.clientwrapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.connect.elasticsearch.ElasticsearchClient; +import io.aiven.connect.elasticsearch.ElasticsearchSinkConnectorConfig; +import io.aiven.connect.elasticsearch.IndexableRecord; +import io.aiven.connect.elasticsearch.Key; +import io.aiven.connect.elasticsearch.Mapping; +import io.aiven.connect.elasticsearch.bulk.BulkRequest; +import io.aiven.connect.elasticsearch.bulk.BulkResponse; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestHighLevelClientBuilder; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetMappingsRequest; +import org.elasticsearch.client.indices.GetMappingsResponse; +import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AivenElasticsearchClientWrapper implements ElasticsearchClient { + + // visible for testing + protected static final String MAPPER_PARSE_EXCEPTION + = "mapper_parse_exception"; + protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION + = "version_conflict_engine_exception"; + + private static final Logger LOG = LoggerFactory.getLogger(AivenElasticsearchClientWrapper.class); + + @SuppressWarnings("deprecation") + private final RestHighLevelClient elasticClient; + private final Version version; + + // visible for testing + @SuppressWarnings("deprecation") + + public AivenElasticsearchClientWrapper(final RestHighLevelClient client) { + try { + this.elasticClient = client; + this.version = getServerVersion(); + } catch (final IOException e) { + throw new ConnectException( + "Couldn't start ElasticsearchSinkTask due to connection error:", + e + ); + } + } + + // visible for testing + public AivenElasticsearchClientWrapper(final String address) { + try { + final Map props = new HashMap<>(); + props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, address); + props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect"); + this.elasticClient = getElasticsearchClient(new ElasticsearchSinkConnectorConfig(props)); + this.version = getServerVersion(); + } catch (final IOException e) { + throw new ConnectException( + "Couldn't start ElasticsearchSinkTask due to connection error:", + e + ); + } catch (final ConfigException e) { + throw new ConnectException( + "Couldn't start ElasticsearchSinkTask due to configuration error:", + e + ); + } + } + + // visible for testing + public AivenElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig config) { + try { + this.elasticClient = getElasticsearchClient(config); + this.version = getServerVersion(); + } catch (final IOException e) { + throw new ConnectException( + "Couldn't start ElasticsearchSinkTask due to connection error:", + e + ); + } catch (final ConfigException e) { + throw new ConnectException( + "Couldn't start ElasticsearchSinkTask due to configuration error:", + e + ); + } + } + + private Version getServerVersion() throws IOException { + final String esVersion = this.elasticClient.info(RequestOptions.DEFAULT).getVersion().getNumber(); + return matchVersionString(esVersion); + } + + private Version matchVersionString(final String esVersion) { + // Default to newest version for forward compatibility + final Version defaultVersion = Version.ES_V8; + if (esVersion == null) { + LOG.warn("Couldn't get Elasticsearch version, version is null"); + return defaultVersion; + } else if (esVersion.startsWith("1.")) { + return Version.ES_V1; + } else if (esVersion.startsWith("2.")) { + return Version.ES_V2; + } else if (esVersion.startsWith("5.")) { + return Version.ES_V5; + } else if (esVersion.startsWith("6.")) { + return Version.ES_V6; + } else if (esVersion.startsWith("7.")) { + return Version.ES_V7; + } else if (esVersion.startsWith("8.")) { + return Version.ES_V8; + } + return defaultVersion; + } + + private boolean es8compat(final Version version) { + return Objects.requireNonNull(version) == Version.ES_V8; + } + + @SuppressWarnings("deprecation") + private RestHighLevelClient getElasticsearchClient(final ElasticsearchSinkConnectorConfig config) { + final HttpHost[] httpHosts = + config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG).stream().map(HttpHost::create) + .toArray(HttpHost[]::new); + final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); + final Optional username = Optional.ofNullable( + config.getString(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG) + ); + final Optional password = Optional.ofNullable( + config.getPassword(ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG) + ); + final CredentialsProvider credentialsProvider; + if (username.isPresent() && password.isPresent()) { + credentialsProvider = new BasicCredentialsProvider(); + final AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM); + final UsernamePasswordCredentials usernamePasswordCredentials = + new UsernamePasswordCredentials(username.get(), password.get().value()); + credentialsProvider.setCredentials(scope, usernamePasswordCredentials); + } else { + credentialsProvider = null; + } + restClientBuilder.setHttpClientConfigCallback(httpClientConfigCallback -> { + httpClientConfigCallback.setDefaultCredentialsProvider(credentialsProvider); + return httpClientConfigCallback.addInterceptorLast((HttpResponseInterceptor) + (response, context) -> + response.addHeader("X-Elastic-Product", "Elasticsearch")); + }); + + final int connTimeout = config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG); + final int readTimeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG); + restClientBuilder.setRequestConfigCallback(requestConfigCallback -> { + requestConfigCallback.setConnectTimeout(connTimeout); + requestConfigCallback.setSocketTimeout(readTimeout); + return requestConfigCallback; + }); + + final RestClient restClient = restClientBuilder.build(); + final Version version = matchVersionString(restClient.getNodes().get(0).getVersion()); + return new RestHighLevelClientBuilder(restClient) + .setApiCompatibilityMode(es8compat(version)) + .build(); + } + + public Version getVersion() { + return version; + } + + private boolean indexExists(final String index) { + final GetIndexRequest getIndexRequest = new GetIndexRequest(index); + try { + return elasticClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT); + } catch (final IOException e) { + throw new ConnectException(e); + } + } + + public void createIndices(final Set indices) { + for (final String index : indices) { + if (!indexExists(index)) { + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(index); + try { + elasticClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + } catch (final IOException e) { + throw new ConnectException("Could not create index '" + index + "'", e); + } + } + } + } + + public void createMapping(final String index, final String type, final Schema schema) throws IOException { + final ObjectNode obj = JsonNodeFactory.instance.objectNode(); + obj.set(type, Mapping.inferMapping(getVersion(), schema)); + final JsonNode part = Mapping.inferMapping(getVersion(), schema); + final PutMappingRequest request = new PutMappingRequest(index) + .source(part.toString(), XContentType.JSON); + try { + this.elasticClient.indices().putMapping(request, RequestOptions.DEFAULT); + } catch (final ElasticsearchException exception) { + throw new ConnectException( + "Cannot create mapping " + schema + " -- " + exception.getDetailedMessage() + ); + } + } + + /** + * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist. + */ + @Override + public MappingMetadata getMapping(final String index, final String type) throws IOException { + final GetMappingsRequest request = new GetMappingsRequest() + .indices(index); + final GetMappingsResponse response = this.elasticClient.indices().getMapping(request, RequestOptions.DEFAULT); + if (response.mappings().isEmpty()) { + return null; + } + return response.mappings().get(index); + } + + public BulkRequest createBulkRequest(final List batch) { + final org.elasticsearch.action.bulk.BulkRequest bulkRequest = new org.elasticsearch.action.bulk.BulkRequest(); + for (final IndexableRecord record : batch) { + bulkRequest.add(toBulkableAction(record)); + } + return new BulkRequestImpl(bulkRequest); + } + + // visible for testing + protected DocWriteRequest toBulkableAction(final IndexableRecord record) { + // If payload is null, the record was a tombstone and we should delete from the index. + return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record); + } + + private DeleteRequest toDeleteRequest(final IndexableRecord record) { + // TODO: Should version information be set here? + return new DeleteRequest(record.key.index).id(record.key.id); + } + + private IndexRequest toIndexRequest(final IndexableRecord record) { + final IndexRequest indexRequest = new IndexRequest(record.key.index) + .id(record.key.id) + .source(record.payload, XContentType.JSON); + if (record.version != null) { + indexRequest + .versionType(VersionType.EXTERNAL) + .version(record.version); + } + return indexRequest; + } + + public BulkResponse executeBulk(final BulkRequest bulk) throws IOException { + final BulkRequestImpl bulkRequest = (BulkRequestImpl) bulk; + final org.elasticsearch.action.bulk.BulkResponse + response = elasticClient.bulk(bulkRequest.getBulkRequest(), RequestOptions.DEFAULT); + + if (!response.hasFailures()) { + return BulkResponse.success(); + } + + boolean retriable = true; + + final List versionConflicts = new ArrayList<>(); + final List errors = new ArrayList<>(); + + for (final BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + final BulkItemResponse.Failure failure = item.getFailure(); + final String errorType = Optional.ofNullable(failure.getCause().getMessage()).orElse(""); + if (errorType.contains("version_conflict_engine_exception")) { + versionConflicts.add(new Key(item.getIndex(), item.getType(), item.getId())); + } else if (errorType.contains("mapper_parse_exception")) { + retriable = false; + errors.add(item.getFailureMessage()); + } else { + errors.add(item.getFailureMessage()); + } + } + } + + if (!versionConflicts.isEmpty()) { + LOG.debug("Ignoring version conflicts for items: {}", versionConflicts); + if (errors.isEmpty()) { + // The only errors were version conflicts + return BulkResponse.success(); + } + } + + final String errorInfo = errors.isEmpty() ? response.buildFailureMessage() : errors.toString(); + + return BulkResponse.failure(retriable, errorInfo); + } + + @Override + public SearchResponse search(final String index) throws IOException { + final SearchRequest searchRequest = new SearchRequest(); + if (index != null) { + searchRequest.indices(index); + } + return elasticClient.search(searchRequest, RequestOptions.DEFAULT); + } + + public void refresh(final String index) throws IOException { + final RefreshRequest request = new RefreshRequest(index); + elasticClient.indices().refresh(request, RequestOptions.DEFAULT); + } + + public void close() throws IOException { + elasticClient.close(); + } +} diff --git a/src/main/java/io/aiven/connect/elasticsearch/jest/JestBulkRequest.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java similarity index 62% rename from src/main/java/io/aiven/connect/elasticsearch/jest/JestBulkRequest.java rename to src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java index 8c7d810..d0f1afc 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/jest/JestBulkRequest.java +++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java @@ -15,21 +15,18 @@ * limitations under the License. */ -package io.aiven.connect.elasticsearch.jest; +package io.aiven.connect.elasticsearch.clientwrapper; import io.aiven.connect.elasticsearch.bulk.BulkRequest; -import io.searchbox.core.Bulk; +public class BulkRequestImpl implements BulkRequest { + private final org.elasticsearch.action.bulk.BulkRequest bulkRequest; -public class JestBulkRequest implements BulkRequest { - - private final Bulk bulk; - - public JestBulkRequest(final Bulk bulk) { - this.bulk = bulk; + public BulkRequestImpl(final org.elasticsearch.action.bulk.BulkRequest bulkRequest) { + this.bulkRequest = bulkRequest; } - public Bulk getBulk() { - return bulk; + public org.elasticsearch.action.bulk.BulkRequest getBulkRequest() { + return bulkRequest; } } diff --git a/src/main/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClient.java deleted file mode 100644 index 0e9f416..0000000 --- a/src/main/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClient.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * Copyright 2018 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.connect.elasticsearch.jest; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; - -import io.aiven.connect.elasticsearch.ElasticsearchClient; -import io.aiven.connect.elasticsearch.ElasticsearchSinkConnectorConfig; -import io.aiven.connect.elasticsearch.IndexableRecord; -import io.aiven.connect.elasticsearch.Key; -import io.aiven.connect.elasticsearch.Mapping; -import io.aiven.connect.elasticsearch.bulk.BulkRequest; -import io.aiven.connect.elasticsearch.bulk.BulkResponse; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.gson.JsonObject; -import io.searchbox.action.Action; -import io.searchbox.action.BulkableAction; -import io.searchbox.client.JestClient; -import io.searchbox.client.JestClientFactory; -import io.searchbox.client.JestResult; -import io.searchbox.client.config.HttpClientConfig; -import io.searchbox.cluster.NodesInfo; -import io.searchbox.core.Bulk; -import io.searchbox.core.BulkResult; -import io.searchbox.core.Delete; -import io.searchbox.core.Index; -import io.searchbox.core.Search; -import io.searchbox.core.SearchResult; -import io.searchbox.indices.CreateIndex; -import io.searchbox.indices.IndicesExists; -import io.searchbox.indices.mapping.GetMapping; -import io.searchbox.indices.mapping.PutMapping; -import org.apache.http.HttpHost; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JestElasticsearchClient implements ElasticsearchClient { - - // visible for testing - protected static final String MAPPER_PARSE_EXCEPTION - = "mapper_parse_exception"; - protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION - = "version_conflict_engine_exception"; - - private static final String INCLUDE_TYPE_NAME_PARAM = "include_type_name"; - - private static final Logger LOG = LoggerFactory.getLogger(JestElasticsearchClient.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private final JestClient client; - private final Version version; - - // visible for testing - public JestElasticsearchClient(final JestClient client) { - try { - this.client = client; - this.version = getServerVersion(); - } catch (final IOException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to connection error:", - e - ); - } - } - - // visible for testing - public JestElasticsearchClient(final String address) { - try { - final JestClientFactory factory = new JestClientFactory(); - factory.setHttpClientConfig(new HttpClientConfig.Builder(address) - .multiThreaded(true) - .build() - ); - this.client = factory.getObject(); - this.version = getServerVersion(); - } catch (final IOException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to connection error:", - e - ); - } catch (final ConfigException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to configuration error:", - e - ); - } - } - - public JestElasticsearchClient(final Map props) { - this(props, new JestClientFactory()); - } - - // visible for testing - protected JestElasticsearchClient(final Map props, final JestClientFactory factory) { - try { - final ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); - final int connTimeout = config.getInt( - ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG); - final int readTimeout = config.getInt( - ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG); - - final String username = config.getString( - ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG); - final Password password = config.getPassword( - ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG); - - final List address = - config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG); - final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(address) - .connTimeout(connTimeout) - .readTimeout(readTimeout) - .multiThreaded(true); - if (username != null && password != null) { - builder.defaultCredentials(username, password.value()) - .preemptiveAuthTargetHosts(address.stream() - .map(addr -> HttpHost.create(addr)).collect(Collectors.toSet())); - } - final HttpClientConfig httpClientConfig = builder.build(); - factory.setHttpClientConfig(httpClientConfig); - this.client = factory.getObject(); - this.version = getServerVersion(); - } catch (final IOException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to connection error:", - e - ); - } catch (final ConfigException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to configuration error:", - e - ); - } - } - - /* - * This method uses the NodesInfo request to get the server version, which is expected to work - * with all versions of Elasticsearch. - */ - private Version getServerVersion() throws IOException { - // Default to newest version for forward compatibility - final Version defaultVersion = Version.ES_V6; - - final NodesInfo info = new NodesInfo.Builder().addCleanApiParameter("version").build(); - final JsonObject result = client.execute(info).getJsonObject(); - if (result == null) { - LOG.warn("Couldn't get Elasticsearch version, result is null"); - return defaultVersion; - } - - checkForError(result); - - final JsonObject nodesRoot = result.get("nodes").getAsJsonObject(); - if (nodesRoot == null || nodesRoot.entrySet().size() == 0) { - LOG.warn("Couldn't get Elasticsearch version, nodesRoot is null or empty"); - return defaultVersion; - } - - final JsonObject nodeRoot = nodesRoot.entrySet().iterator().next().getValue().getAsJsonObject(); - if (nodeRoot == null) { - LOG.warn("Couldn't get Elasticsearch version, nodeRoot is null"); - return defaultVersion; - } - - final String esVersion = nodeRoot.get("version").getAsString(); - if (esVersion == null) { - LOG.warn("Couldn't get Elasticsearch version, version is null"); - return defaultVersion; - } else if (esVersion.startsWith("1.")) { - return Version.ES_V1; - } else if (esVersion.startsWith("2.")) { - return Version.ES_V2; - } else if (esVersion.startsWith("5.")) { - return Version.ES_V5; - } else if (esVersion.startsWith("6.")) { - return Version.ES_V6; - } else if (esVersion.startsWith("7.")) { - return Version.ES_V7; - } - return defaultVersion; - } - - private void checkForError(final JsonObject result) { - if (result.has("error") && result.get("error").isJsonObject()) { - final JsonObject errorObject = result.get("error").getAsJsonObject(); - final String errorType = errorObject.has("type") ? errorObject.get("type").getAsString() : ""; - final String errorReason = errorObject.has("reason") ? errorObject.get("reason").getAsString() : ""; - throw new ConnectException("Couldn't connect to Elasticsearch, error: " - + errorType + ", reason: " + errorReason); - } - } - - public Version getVersion() { - return version; - } - - private boolean indexExists(final String index) { - final Action action = new IndicesExists.Builder(index).build(); - try { - final JestResult result = client.execute(action); - return result.isSucceeded(); - } catch (final IOException e) { - throw new ConnectException(e); - } - } - - public void createIndices(final Set indices) { - for (final String index : indices) { - if (!indexExists(index)) { - final CreateIndex.Builder builder = new CreateIndex.Builder(index); - if (version.equals(Version.ES_V7)) { - builder.setParameter(INCLUDE_TYPE_NAME_PARAM, true); - } - final CreateIndex createIndex = builder.build(); - try { - final JestResult result = client.execute(createIndex); - if (!result.isSucceeded()) { - // Check if index was created by another client - if (!indexExists(index)) { - final String msg = result.getErrorMessage() != null ? ": " + result.getErrorMessage() : ""; - throw new ConnectException("Could not create index '" + index + "'" + msg); - } - } - } catch (final IOException e) { - throw new ConnectException(e); - } - } - } - } - - public void createMapping(final String index, final String type, final Schema schema) throws IOException { - final ObjectNode obj = JsonNodeFactory.instance.objectNode(); - obj.set(type, Mapping.inferMapping(this, schema)); - final PutMapping.Builder builder = new PutMapping.Builder(index, type, obj.toString()); - if (version.equals(Version.ES_V7)) { - builder.setParameter(INCLUDE_TYPE_NAME_PARAM, true); - } - final PutMapping putMapping = builder.build(); - final JestResult result = client.execute(putMapping); - if (!result.isSucceeded()) { - throw new ConnectException( - "Cannot create mapping " + obj + " -- " + result.getErrorMessage() - ); - } - } - - /** - * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist. - */ - public JsonObject getMapping(final String index, final String type) throws IOException { - final GetMapping.Builder builder = new GetMapping.Builder(); - if (version.equals(Version.ES_V7)) { - builder.setParameter(INCLUDE_TYPE_NAME_PARAM, true); - } - builder.addIndex(index).addType(type); - final JestResult result = client.execute(builder.build()); - final JsonObject indexRoot = result.getJsonObject().getAsJsonObject(index); - if (indexRoot == null) { - return null; - } - final JsonObject mappingsJson = indexRoot.getAsJsonObject("mappings"); - if (mappingsJson == null) { - return null; - } - return mappingsJson.getAsJsonObject(type); - } - - public BulkRequest createBulkRequest(final List batch) { - final Bulk.Builder builder = new Bulk.Builder(); - for (final IndexableRecord record : batch) { - builder.addAction(toBulkableAction(record)); - } - return new JestBulkRequest(builder.build()); - } - - // visible for testing - protected BulkableAction toBulkableAction(final IndexableRecord record) { - // If payload is null, the record was a tombstone and we should delete from the index. - return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record); - } - - private Delete toDeleteRequest(final IndexableRecord record) { - final Delete.Builder req = new Delete.Builder(record.key.id) - .index(record.key.index) - .type(record.key.type); - - // TODO: Should version information be set here? - return req.build(); - } - - private Index toIndexRequest(final IndexableRecord record) { - final Index.Builder req = new Index.Builder(record.payload) - .index(record.key.index) - .type(record.key.type) - .id(record.key.id); - if (record.version != null) { - req.setParameter("version_type", "external").setParameter("version", record.version); - } - return req.build(); - } - - public BulkResponse executeBulk(final BulkRequest bulk) throws IOException { - final BulkResult result = client.execute(((JestBulkRequest) bulk).getBulk()); - - if (result.isSucceeded()) { - return BulkResponse.success(); - } - - boolean retriable = true; - - final List versionConflicts = new ArrayList<>(); - final List errors = new ArrayList<>(); - - for (final BulkResult.BulkResultItem item : result.getItems()) { - if (item.error != null) { - final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error); - final String errorType = parsedError.get("type").asText(""); - if ("version_conflict_engine_exception".equals(errorType)) { - versionConflicts.add(new Key(item.index, item.type, item.id)); - } else if ("mapper_parse_exception".equals(errorType)) { - retriable = false; - errors.add(item.error); - } else { - errors.add(item.error); - } - } - } - - if (!versionConflicts.isEmpty()) { - LOG.debug("Ignoring version conflicts for items: {}", versionConflicts); - if (errors.isEmpty()) { - // The only errors were version conflicts - return BulkResponse.success(); - } - } - - final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString(); - - return BulkResponse.failure(retriable, errorInfo); - } - - public JsonObject search(final String query, final String index, final String type) throws IOException { - final Search.Builder search = new Search.Builder(query); - if (index != null) { - search.addIndex(index); - } - if (type != null) { - search.addType(type); - } - - final SearchResult result = client.execute(search.build()); - - return result.getJsonObject(); - } - - public void close() throws IOException { - client.close(); - } -} diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTaskTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTaskTest.java index 7a9bf4a..9293d64 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTaskTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTaskTest.java @@ -31,9 +31,9 @@ import org.apache.kafka.connect.sink.SinkRecord; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; -import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; +import static org.junit.Assert.fail; @ThreadLeakScope(ThreadLeakScope.Scope.NONE) public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase { @@ -52,13 +52,15 @@ private Map createProps() { @Test public void testPutAndFlush() throws Exception { - final InternalTestCluster cluster = internalCluster(); - cluster.ensureAtLeastNumDataNodes(3); final Map props = createProps(); final ElasticsearchSinkTask task = new ElasticsearchSinkTask(); task.start(props, client); - task.open(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3))); + task.open(new HashSet<>(Arrays.asList( + getTopicPartition(PARTITION), + getTopicPartition(PARTITION2), + getTopicPartition(PARTITION3))) + ); final String key = "key"; final Schema schema = createSchema(); @@ -66,16 +68,15 @@ public void testPutAndFlush() throws Exception { final Collection records = new ArrayList<>(); - SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 0); + SinkRecord sinkRecord = new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 0); records.add(sinkRecord); - sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); + sinkRecord = new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); records.add(sinkRecord); task.put(records); task.flush(null); - - refresh(); + task.refresh(getTopic()); verifySearchResults(records, true, false); } @@ -84,8 +85,6 @@ public void testPutAndFlush() throws Exception { public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() { // We should as well test that writing a record with a previously un seen record will create // an index following the required elasticsearch requirements of lowercasing. - final InternalTestCluster cluster = internalCluster(); - cluster.ensureAtLeastNumDataNodes(3); final Map props = createProps(); final ElasticsearchSinkTask task = new ElasticsearchSinkTask(); diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java index a409f91..4f04b59 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -28,52 +29,71 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; -import io.aiven.connect.elasticsearch.jest.JestElasticsearchClient; +import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; +import org.testcontainers.elasticsearch.ElasticsearchContainer; -public class ElasticsearchSinkTestBase extends ESIntegTestCase { +import static org.junit.Assert.assertEquals; + +public class ElasticsearchSinkTestBase { protected static final String TYPE = "kafka-connect"; - protected static final String TOPIC = "topic"; + private static final String TOPIC = "topic"; protected static final int PARTITION = 12; protected static final int PARTITION2 = 13; protected static final int PARTITION3 = 14; - protected static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); - protected static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); - protected static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); - protected ElasticsearchClient client; private DataConverter converter; + private static ElasticsearchContainer container; + + private final String testTopic; + + public ElasticsearchSinkTestBase() { + this.testTopic = String.format("%s-%s", TOPIC, UUID.randomUUID().toString()); + } + + @BeforeClass + public static void staticSetUp() { + container = new ElasticsearchContainer("elasticsearch:7.17.0"); + container.start(); + } + @Before public void setUp() throws Exception { - super.setUp(); - client = new JestElasticsearchClient("http://localhost:" + getPort()); + client = new AivenElasticsearchClientWrapper("http://" + container.getHttpHostAddress()); converter = new DataConverter(true, DataConverter.BehaviorOnNullValues.IGNORE); } @After public void tearDown() throws Exception { - super.tearDown(); if (client != null) { client.close(); } client = null; } - protected int getPort() { - assertTrue("There should be at least 1 HTTP endpoint exposed in the test cluster", - cluster().httpAddresses().length > 0); - return cluster().httpAddresses()[0].getPort(); + @AfterClass + public static void staticTearDown() { + if (container != null) { + container.close(); + } + container = null; + } + + public String getTopic() { + return this.testTopic; + } + + public TopicPartition getTopicPartition(final int partition) { + return new TopicPartition(this.testTopic, partition); } protected Struct createRecord(final Schema schema) { @@ -106,7 +126,7 @@ protected void verifySearchResults( final Collection records, final boolean ignoreKey, final boolean ignoreSchema) throws IOException { - verifySearchResults(records, TOPIC, ignoreKey, ignoreSchema); + verifySearchResults(records, getTopic(), ignoreKey, ignoreSchema); } protected void verifySearchResults( @@ -114,17 +134,15 @@ protected void verifySearchResults( final String index, final boolean ignoreKey, final boolean ignoreSchema) throws IOException { - final JsonObject result = client.search("", index, null); + final SearchResponse result = client.search(index); + final SearchHit[] rawHits = result.getHits().getHits(); - final JsonArray rawHits = result.getAsJsonObject("hits").getAsJsonArray("hits"); - - assertEquals(records.size(), rawHits.size()); + assertEquals(records.size(), rawHits.length); final Map hits = new HashMap<>(); - for (int i = 0; i < rawHits.size(); ++i) { - final JsonObject hitData = rawHits.get(i).getAsJsonObject(); - final String id = hitData.get("_id").getAsString(); - final String source = hitData.get("_source").getAsJsonObject().toString(); + for (int i = 0; i < rawHits.length; ++i) { + final String id = rawHits[i].getId(); + final String source = rawHits[i].getSourceAsString(); hits.put(id, source); } @@ -138,38 +156,4 @@ protected void verifySearchResults( } } } - - /* For ES 2.x */ - @Override - protected Settings nodeSettings(final int nodeOrdinal) { - return Settings.settingsBuilder() - .put(super.nodeSettings(nodeOrdinal)) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(Node.HTTP_ENABLED, true) - .build(); - } - - /* For ES 5.x (requires Java 8) */ - /* - @Override - protected Settings nodeSettings(int nodeOrdinal) { - int randomPort = randomIntBetween(49152, 65525); - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(NetworkModule.HTTP_ENABLED.getKey(), true) - .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), randomPort) - .put("network.host", "127.0.0.1") - .build(); - } - - @Override - protected Collection> nodePlugins() { - System.setProperty("es.set.netty.runtime.available.processors", "false"); - Collection> al = new ArrayList>(); - al.add(Netty4Plugin.class); - return al; - } - */ - } diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java index 671c007..b0bfefe 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java @@ -17,6 +17,7 @@ package io.aiven.connect.elasticsearch; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -44,6 +45,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.junit.Assert.fail; + @ThreadLeakScope(ThreadLeakScope.Scope.NONE) public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { @@ -71,10 +74,10 @@ public void setUp() throws Exception { public void testWriter() throws Exception { final Collection records = prepareData(2); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); final Collection expected = Collections.singletonList( - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1) + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1) ); verifySearchResults(expected); } @@ -85,7 +88,7 @@ public void testWriterIgnoreKey() throws Exception { final Collection records = prepareData(2); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(records); } @@ -96,7 +99,7 @@ public void testWriterIgnoreSchema() throws Exception { final Collection records = prepareData(2); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(records); } @@ -112,10 +115,10 @@ public void testTopicIndexOverride() throws Exception { client, Collections.emptySet(), Collections.emptySet(), - Collections.singletonMap(TOPIC, indexOverride), + Collections.singletonMap(getTopic(), indexOverride), false, DataConverter.BehaviorOnNullValues.IGNORE); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, indexOverride); verifySearchResults(records, indexOverride); } @@ -125,7 +128,7 @@ public void testIncompatible() throws Exception { final Collection records = new ArrayList<>(); SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client); @@ -135,7 +138,7 @@ public void testIncompatible() throws Exception { records.clear(); sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); records.add(sinkRecord); writer.write(records); @@ -151,11 +154,11 @@ public void testCompatible() throws Exception { final Collection expected = new ArrayList<>(); SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 0); records.add(sinkRecord); expected.add(sinkRecord); sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1); records.add(sinkRecord); expected.add(sinkRecord); @@ -165,16 +168,16 @@ public void testCompatible() throws Exception { records.clear(); sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 2); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 2); records.add(sinkRecord); expected.add(sinkRecord); sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 3); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 3); records.add(sinkRecord); expected.add(sinkRecord); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(expected); } @@ -184,20 +187,20 @@ public void testSafeRedeliveryRegularKey() throws Exception { value0.put("user", "foo"); value0.put("message", "hi"); final SinkRecord sinkRecord0 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); final Struct value1 = new Struct(schema); value1.put("user", "foo"); value1.put("message", "bye"); final SinkRecord sinkRecord1 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); final ElasticsearchWriter writer = initWriter(client); writer.write(Arrays.asList(sinkRecord0, sinkRecord1)); writer.flush(); // write the record with earlier offset again - writeDataAndRefresh(writer, Collections.singleton(sinkRecord0)); + writeDataAndRefresh(writer, Collections.singleton(sinkRecord0), getTopic()); // last write should have been ignored due to version conflict verifySearchResults(Collections.singleton(sinkRecord1)); @@ -211,13 +214,13 @@ public void testSafeRedeliveryOffsetInKey() throws Exception { value0.put("user", "foo"); value0.put("message", "hi"); final SinkRecord sinkRecord0 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); final Struct value1 = new Struct(schema); value1.put("user", "foo"); value1.put("message", "bye"); final SinkRecord sinkRecord1 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); final List records = Arrays.asList(sinkRecord0, sinkRecord1); @@ -226,7 +229,7 @@ public void testSafeRedeliveryOffsetInKey() throws Exception { writer.flush(); // write them again - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); // last write should have been ignored due to version conflict verifySearchResults(records); @@ -247,11 +250,11 @@ public void testMap() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(records); } @@ -264,14 +267,14 @@ public void testStringKeyedMap() throws Exception { map.put("Two", 2); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, mapSchema, map, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, mapSchema, map, 0); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, Collections.singletonList(sinkRecord)); + writeDataAndRefresh(writer, Collections.singletonList(sinkRecord), getTopic()); final Collection expectedRecords = Collections.singletonList(new ObjectMapper().writeValueAsString(map)); - verifySearchResults(expectedRecords, TOPIC); + verifySearchResults(expectedRecords, getTopic()); } @Test @@ -289,11 +292,11 @@ public void testDecimal() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(records); } @@ -308,11 +311,11 @@ public void testBytes() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(records); } @@ -320,11 +323,11 @@ public void testBytes() throws Exception { public void testIgnoreNullValue() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client, DataConverter.BehaviorOnNullValues.IGNORE); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); // Send an empty list of records to the verify method, since the empty record should have been // skipped verifySearchResults(new ArrayList()); @@ -341,26 +344,26 @@ public void testDeleteOnNullValue() throws Exception { // First, write a couple of actual (non-null-valued) records final SinkRecord insertRecord1 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key1, schema, record, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key1, schema, record, 0); records.add(insertRecord1); final SinkRecord insertRecord2 = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key2, otherSchema, otherRecord, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key2, otherSchema, otherRecord, 1); records.add(insertRecord2); // Can't call writeDataAndRefresh(writer, records) since it stops the writer writer.write(records); writer.flush(); - refresh(); + client.refresh(getTopic()); // Make sure the record made it there successfully verifySearchResults(records); // Then, write a record with the same key as the first inserted record but a null value final SinkRecord deleteRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key1, schema, null, 2); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key1, schema, null, 2); // Don't want to resend the first couple of records records.clear(); records.add(deleteRecord); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); // The only remaining record should be the second inserted record records.clear(); @@ -375,11 +378,11 @@ public void testIneffectiveDelete() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client, DataConverter.BehaviorOnNullValues.DELETE); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(new ArrayList()); } @@ -387,11 +390,11 @@ public void testIneffectiveDelete() throws Exception { public void testDeleteWithNullKey() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, schema, null, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, null, schema, null, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client, DataConverter.BehaviorOnNullValues.DELETE); - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); verifySearchResults(new ArrayList()); } @@ -399,12 +402,12 @@ public void testDeleteWithNullKey() throws Exception { public void testFailOnNullValue() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, null, 0); records.add(sinkRecord); final ElasticsearchWriter writer = initWriter(client, DataConverter.BehaviorOnNullValues.FAIL); try { - writeDataAndRefresh(writer, records); + writeDataAndRefresh(writer, records, getTopic()); fail("should fail because of behavior.on.null.values=fail"); } catch (final DataException e) { // expected @@ -418,7 +421,7 @@ public void testInvalidRecordException() throws Exception { final Collection records = new ArrayList<>(); final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, null, new byte[]{42}, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, null, null, new byte[]{42}, 0); records.add(sinkRecord); final ElasticsearchWriter strictWriter = initWriter(client); @@ -443,9 +446,9 @@ public void testDropInvalidRecord() throws Exception { final SinkRecord invalidRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, structSchema, struct, 0); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, null, structSchema, struct, 0); final SinkRecord validRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 1); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 1); inputRecords.add(validRecord); inputRecords.add(invalidRecord); @@ -454,7 +457,7 @@ public void testDropInvalidRecord() throws Exception { final ElasticsearchWriter nonStrictWriter = initWriter(client, true); - writeDataAndRefresh(nonStrictWriter, inputRecords); + writeDataAndRefresh(nonStrictWriter, inputRecords, getTopic()); verifySearchResults(outputRecords, ignoreKey, ignoreSchema); } @@ -462,7 +465,7 @@ private Collection prepareData(final int numRecords) { final Collection records = new ArrayList<>(); for (int i = 0; i < numRecords; ++i) { final SinkRecord sinkRecord = - new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, i); + new SinkRecord(getTopic(), PARTITION, Schema.STRING_SCHEMA, key, schema, record, i); records.add(sinkRecord); } return records; @@ -520,16 +523,18 @@ private ElasticsearchWriter initWriter( .setBehaviorOnNullValues(behavior) .build(); writer.start(); - writer.createIndicesForTopics(Collections.singleton(TOPIC)); + writer.createIndicesForTopics(Collections.singleton(getTopic())); return writer; } - private void writeDataAndRefresh(final ElasticsearchWriter writer, final Collection records) - throws Exception { + private void writeDataAndRefresh( + final ElasticsearchWriter writer, + final Collection records, + final String index) throws IOException { writer.write(records); writer.flush(); writer.stop(); - refresh(); + client.refresh(index); } private void verifySearchResults(final Collection records) throws Exception { diff --git a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java index 8b62325..63f0437 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java @@ -17,6 +17,10 @@ package io.aiven.connect.elasticsearch; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; @@ -28,10 +32,11 @@ import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; -import com.google.gson.JsonObject; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,16 +48,16 @@ public class MappingTest extends ElasticsearchSinkTestBase { @Test @SuppressWarnings("unchecked") public void testMapping() throws Exception { - final InternalTestCluster cluster = internalCluster(); - cluster.ensureAtLeastNumDataNodes(1); + final Set indices = new HashSet<>(); + indices.add(INDEX); + client.createIndices(indices); - createIndex(INDEX); final Schema schema = createSchema(); Mapping.createMapping(client, INDEX, TYPE, schema); - final JsonObject mapping = Mapping.getMapping(client, INDEX, TYPE); + final MappingMetadata mapping = Mapping.getMapping(client, INDEX, TYPE); assertNotNull(mapping); - verifyMapping(schema, mapping); + verifyMapping(schema, mapping.sourceAsMap()); } @Test @@ -64,7 +69,7 @@ public void testStringMappingForES6() throws Exception { final Schema schema = SchemaBuilder.struct().name("textRecord") .field("string", Schema.STRING_SCHEMA) .build(); - final ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client, schema); + final ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client.getVersion(), schema); final ObjectNode properties = mapping.with("properties"); final ObjectNode string = properties.with("string"); final TextNode stringType = (TextNode) string.get("type"); @@ -121,25 +126,25 @@ private Schema createInnerSchema() { } @SuppressWarnings("unchecked") - private void verifyMapping(final Schema schema, final JsonObject mapping) throws Exception { + private void verifyMapping(final Schema schema, final Map mapping) { final String schemaName = schema.name(); - final Object type = mapping.get("type"); if (schemaName != null) { switch (schemaName) { case Date.LOGICAL_NAME: case Time.LOGICAL_NAME: case Timestamp.LOGICAL_NAME: - assertEquals("\"" + ElasticsearchSinkConnectorConstants.DATE_TYPE + "\"", type.toString()); + assertEquals(ElasticsearchSinkConnectorConstants.DATE_TYPE, type.toString()); return; case Decimal.LOGICAL_NAME: - assertEquals("\"" + ElasticsearchSinkConnectorConstants.DOUBLE_TYPE + "\"", type.toString()); + assertEquals(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, type.toString()); return; default: } } - final DataConverter converter = new DataConverter(true, DataConverter.BehaviorOnNullValues.IGNORE); + final DataConverter converter = + new DataConverter(true, DataConverter.BehaviorOnNullValues.IGNORE); final Schema.Type schemaType = schema.type(); switch (schemaType) { case ARRAY: @@ -147,24 +152,26 @@ private void verifyMapping(final Schema schema, final JsonObject mapping) throws break; case MAP: final Schema newSchema = converter.preProcessSchema(schema); - final JsonObject mapProperties = mapping.get("properties").getAsJsonObject(); + final Map> mapMapping = + (Map>) mapping.get("properties"); verifyMapping( newSchema.keySchema(), - mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_KEY).getAsJsonObject() + mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_KEY) ); verifyMapping( newSchema.valueSchema(), - mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_VALUE).getAsJsonObject() + mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_VALUE) ); break; case STRUCT: - final JsonObject properties = mapping.get("properties").getAsJsonObject(); + final Map> structMapping = + (Map>) mapping.get("properties"); for (final Field field : schema.fields()) { - verifyMapping(field.schema(), properties.get(field.name()).getAsJsonObject()); + verifyMapping(field.schema(), structMapping.get(field.name())); } break; default: - assertEquals("\"" + Mapping.getElasticsearchType(client, schemaType) + "\"", type.toString()); + assertEquals(Mapping.getElasticsearchType(client.getVersion(), schemaType), type.toString()); } } } diff --git a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java new file mode 100644 index 0000000..a0ed1c5 --- /dev/null +++ b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java @@ -0,0 +1,323 @@ +/* + * Copyright 2020 Aiven Oy + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.elasticsearch.clientwrapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.connect.elasticsearch.ElasticsearchClient; +import io.aiven.connect.elasticsearch.IndexableRecord; +import io.aiven.connect.elasticsearch.Key; +import io.aiven.connect.elasticsearch.Mapping; +import io.aiven.connect.elasticsearch.bulk.BulkRequest; + +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Sets; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.client.IndicesClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.core.MainResponse; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetMappingsRequest; +import org.elasticsearch.client.indices.GetMappingsResponse; +import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHits; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AivenElasticsearchClientWrapperTest { + + private static final String INDEX = "index"; + private static final String KEY = "key"; + private static final String TYPE = "type"; + + private RestHighLevelClient elasticsearchClient; + + @Before + public void setUp() throws Exception { + elasticsearchClient = mock(RestHighLevelClient.class); + when(elasticsearchClient.info(any())).thenReturn( + new MainResponse("localhost", + new MainResponse.Version("1.0", "buildFlavor", + "buildType", "buildHash", "buildDate", false, + "luceneVersion", + "minWireCompVersion", "minIndexCompVersion"), + "clusterName", UUID.randomUUID().toString(), "tagLine" + ) + ); + } + + @Test + public void getsVersion() { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + assertThat(client.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1))); + } + + @Test + public void createsIndices() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final IndicesClient indicesClient = mock(IndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); + when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure")); + + when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); + when(indicesClient.create(argThat(isCreateIndexForTestIndex()), any())).thenReturn( + new CreateIndexResponse(true, true, INDEX)); + + final Set indices = Sets.newHashSet(); + indices.add(INDEX); + client.createIndices(indices); + final InOrder inOrder = inOrder(elasticsearchClient, indicesClient); + inOrder.verify(elasticsearchClient).info(any()); + inOrder.verify(indicesClient).exists(any(GetIndexRequest.class), any()); + inOrder.verify(indicesClient).create(argThat(isCreateIndexForTestIndex()), any()); + } + + private ArgumentMatcher isCreateIndexForTestIndex() { + return new ArgumentMatcher() { + @Override + public boolean matches(final CreateIndexRequest createIndexRequest) { + // check the URI as the equals method on CreateIndex doesn't work + return createIndexRequest.index().equals(INDEX); + } + }; + } + + @Test + public void createIndicesAndFails() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final IndicesClient indicesClient = mock(IndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); + when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure")); + + final Set indices = new HashSet<>(); + indices.add("test-index"); + assertThrows("Could not create index 'test-index'", ConnectException.class, () -> { + client.createIndices(indices); + }); + } + + @Test + public void createsMapping() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final IndicesClient indicesClient = mock(IndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + final ObjectNode obj = JsonNodeFactory.instance.objectNode(); + obj.set(TYPE, Mapping.inferMapping(client.getVersion(), Schema.STRING_SCHEMA)); + client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); + verify(indicesClient).putMapping(any(PutMappingRequest.class), any()); + } + + @Test(expected = ConnectException.class) + public void createsMappingAndFails() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final IndicesClient indicesClient = mock(IndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.putMapping(any(PutMappingRequest.class), any())) + .thenThrow(new ElasticsearchException("failure")); + client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); + } + + @Test + public void getsMapping() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final IndicesClient indicesClient = mock(IndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + final MappingMetadata mappingMetadata = new MappingMetadata(TYPE, new HashMap<>()); + final Map mappingsMetadata = new HashMap<>(); + mappingsMetadata.put(INDEX, mappingMetadata); + when(indicesClient.getMapping(any(GetMappingsRequest.class), any())).thenReturn( + new GetMappingsResponse(mappingsMetadata)); + assertEquals(client.getMapping(INDEX, TYPE), new MappingMetadata(TYPE, new HashMap<>())); + } + + @Test + public void executesBulk() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + BulkItemResponse.success(1, DocWriteRequest.OpType.CREATE, + new IndexResponse(ShardId.fromString("[" + INDEX + "][1]"), TYPE, "id", 1L, 1L, 1L, true) + ) + }; + when(elasticsearchClient.bulk(any(), any())).thenReturn( + new BulkResponse(bulkItemResponses, 100) + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = client.createBulkRequest(records); + assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(true))); + } + + @Test + public void executesBulkAndFails() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( + INDEX, TYPE, "id", new IOException("failure") + )) + }; + when(elasticsearchClient.bulk(any(), any())).thenReturn( + new BulkResponse(bulkItemResponses, 100) + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = client.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(true))); + assertEquals("[java.io.IOException: failure]", bulkResponse.getErrorInfo()); + } + + @Test + public void executesBulkAndFailsWithParseError() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( + INDEX, AivenElasticsearchClientWrapper.MAPPER_PARSE_EXCEPTION, "id", + new ElasticsearchException("[type=mapper_parse_exception, reason=[key]: Mapper parse error]") + + )) + }; + when(elasticsearchClient.bulk(any(), any())).thenReturn( + new BulkResponse(bulkItemResponses, 100) + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = client.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(false))); + assertEquals( + "[ElasticsearchException[[type=mapper_parse_exception, reason=[key]: Mapper parse error]]]", + bulkResponse.getErrorInfo() + ); + } + + @Test + public void executesBulkAndFailsWithSomeOtherError() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( + INDEX, "some_random_exception", "id", + new ElasticsearchException("[type=random_type_string, reason=[key]: Unknown error]") + )) + }; + when(elasticsearchClient.bulk(any(), any())).thenReturn( + new BulkResponse(bulkItemResponses, 100) + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = client.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(true))); + assertEquals( + "[ElasticsearchException[[type=random_type_string, reason=[key]: Unknown error]]]", + bulkResponse.getErrorInfo() + ); + } + + @Test + public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( + INDEX, "_doc", "id", + new ElasticsearchException( + "[type=version_conflict_engine_exception, reason=[key]: " + + "version conflict, current version [1] is higher or equal to the one provided [0]]") + )) + }; + when(elasticsearchClient.bulk(any(), any())).thenReturn( + new BulkResponse(bulkItemResponses, 100) + ); + + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = client.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(true))); + } + + @Test + public void searches() throws Exception { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + final SearchResponse response = new SearchResponse( + new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 0), + "scrollId", 1, 1, 0, 100L, null, SearchResponse.Clusters.EMPTY + ); + when(elasticsearchClient.search(any(SearchRequest.class), any())).thenReturn(response); + assertNotNull(client.search(INDEX)); + verify(elasticsearchClient).search(any(SearchRequest.class), any()); + } + + @Test + public void closes() throws IOException { + final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); + client.close(); + verify(elasticsearchClient).close(); + } +} diff --git a/src/test/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClientTest.java b/src/test/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClientTest.java deleted file mode 100644 index 4ef3436..0000000 --- a/src/test/java/io/aiven/connect/elasticsearch/jest/JestElasticsearchClientTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * Copyright 2018 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.connect.elasticsearch.jest; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; - -import io.aiven.connect.elasticsearch.ElasticsearchClient; -import io.aiven.connect.elasticsearch.ElasticsearchSinkConnectorConfig; -import io.aiven.connect.elasticsearch.IndexableRecord; -import io.aiven.connect.elasticsearch.Key; -import io.aiven.connect.elasticsearch.Mapping; -import io.aiven.connect.elasticsearch.bulk.BulkRequest; - -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import io.searchbox.client.JestClient; -import io.searchbox.client.JestClientFactory; -import io.searchbox.client.JestResult; -import io.searchbox.client.config.ElasticsearchVersion; -import io.searchbox.client.config.HttpClientConfig; -import io.searchbox.cluster.NodesInfo; -import io.searchbox.core.BulkResult; -import io.searchbox.core.Search; -import io.searchbox.core.SearchResult; -import io.searchbox.indices.CreateIndex; -import io.searchbox.indices.IndicesExists; -import io.searchbox.indices.mapping.GetMapping; -import io.searchbox.indices.mapping.PutMapping; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.Credentials; -import org.apache.http.client.CredentialsProvider; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; -import org.mockito.InOrder; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class JestElasticsearchClientTest { - - private static final String INDEX = "index"; - private static final String KEY = "key"; - private static final String TYPE = "type"; - private static final String QUERY = "query"; - - private JestClient jestClient; - private JestClientFactory jestClientFactory; - private NodesInfo info; - - @Before - public void setUp() throws Exception { - jestClient = mock(JestClient.class); - jestClientFactory = mock(JestClientFactory.class); - when(jestClientFactory.getObject()).thenReturn(jestClient); - info = new NodesInfo.Builder().addCleanApiParameter("version").build(); - final JsonObject nodeRoot = new JsonObject(); - nodeRoot.addProperty("version", "1.0"); - final JsonObject nodesRoot = new JsonObject(); - nodesRoot.add("localhost", nodeRoot); - final JsonObject nodesInfo = new JsonObject(); - nodesInfo.add("nodes", nodesRoot); - final JestResult result = new JestResult(new Gson()); - result.setJsonObject(nodesInfo); - when(jestClient.execute(info)).thenReturn(result); - } - - @Test - public void connectsSecurely() { - final Map props = new HashMap<>(); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:9200"); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG, "elastic"); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG, "elasticpw"); - props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect"); - final JestElasticsearchClient client = new JestElasticsearchClient(props, jestClientFactory); - - final ArgumentCaptor captor = ArgumentCaptor.forClass(HttpClientConfig.class); - verify(jestClientFactory).setHttpClientConfig(captor.capture()); - final HttpClientConfig httpClientConfig = captor.getValue(); - final CredentialsProvider credentialsProvider = httpClientConfig.getCredentialsProvider(); - final Credentials credentials = credentialsProvider.getCredentials(AuthScope.ANY); - final Set preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts(); - assertEquals("elastic", credentials.getUserPrincipal().getName()); - assertEquals("elasticpw", credentials.getPassword()); - assertEquals(HttpHost.create("http://localhost:9200"), preemptiveAuthTargetHosts.iterator().next()); - } - - @Test - public void connectsSecurelyWithEmptyUsernameAndPassword() { - final Map props = new HashMap<>(); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:9200"); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG, ""); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG, ""); - props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect"); - final JestElasticsearchClient client = new JestElasticsearchClient(props, jestClientFactory); - - final ArgumentCaptor captor = ArgumentCaptor.forClass(HttpClientConfig.class); - verify(jestClientFactory).setHttpClientConfig(captor.capture()); - final HttpClientConfig httpClientConfig = captor.getValue(); - final CredentialsProvider credentialsProvider = httpClientConfig.getCredentialsProvider(); - final Credentials credentials = credentialsProvider.getCredentials(AuthScope.ANY); - final Set preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts(); - assertEquals("", credentials.getUserPrincipal().getName()); - assertEquals("", credentials.getPassword()); - assertEquals(HttpHost.create("http://localhost:9200"), preemptiveAuthTargetHosts.iterator().next()); - } - - @Test - public void getsVersion() { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - assertThat(client.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1))); - } - - @Test - public void createsIndices() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final JestResult failure = new JestResult(new Gson()); - failure.setSucceeded(false); - final JestResult success = new JestResult(new Gson()); - success.setSucceeded(true); - final IndicesExists indicesExists = new IndicesExists.Builder(INDEX).build(); - when(jestClient.execute(indicesExists)).thenReturn(failure); - when(jestClient.execute(argThat(isCreateIndexForTestIndex()))).thenReturn(success); - - final Set indices = Sets.newHashSet(); - indices.add(INDEX); - client.createIndices(indices); - final InOrder inOrder = inOrder(jestClient); - inOrder.verify(jestClient).execute(info); - inOrder.verify(jestClient).execute(indicesExists); - inOrder.verify(jestClient).execute(argThat(isCreateIndexForTestIndex())); - } - - private ArgumentMatcher isCreateIndexForTestIndex() { - return new ArgumentMatcher() { - @Override - public boolean matches(final CreateIndex createIndex) { - // check the URI as the equals method on CreateIndex doesn't work - return createIndex.getURI(ElasticsearchVersion.V2).equals(INDEX); - } - }; - } - - @Test(expected = ConnectException.class) - public void createIndicesAndFails() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final JestResult failure = new JestResult(new Gson()); - failure.setSucceeded(false); - final IndicesExists indicesExists = new IndicesExists.Builder(INDEX).build(); - when(jestClient.execute(indicesExists)).thenReturn(failure); - when(jestClient.execute(argThat(isCreateIndexForTestIndex()))).thenReturn(failure); - - final Set indices = new HashSet<>(); - indices.add(INDEX); - client.createIndices(indices); - } - - @Test - public void createsMapping() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final JestResult success = new JestResult(new Gson()); - success.setSucceeded(true); - final ObjectNode obj = JsonNodeFactory.instance.objectNode(); - obj.set(TYPE, Mapping.inferMapping(client, Schema.STRING_SCHEMA)); - final PutMapping putMapping = new PutMapping.Builder(INDEX, TYPE, obj.toString()).build(); - when(jestClient.execute(putMapping)).thenReturn(success); - - client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); - verify(jestClient).execute(putMapping); - } - - @Test(expected = ConnectException.class) - public void createsMappingAndFails() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final JestResult failure = new JestResult(new Gson()); - failure.setSucceeded(false); - final ObjectNode obj = JsonNodeFactory.instance.objectNode(); - obj.set(TYPE, Mapping.inferMapping(client, Schema.STRING_SCHEMA)); - final PutMapping putMapping = new PutMapping.Builder(INDEX, TYPE, obj.toString()).build(); - when(jestClient.execute(putMapping)).thenReturn(failure); - - client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); - } - - @Test - public void getsMapping() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final JsonObject mapping = new JsonObject(); - final JsonObject mappings = new JsonObject(); - mappings.add(TYPE, mapping); - final JsonObject indexRoot = new JsonObject(); - indexRoot.add("mappings", mappings); - final JsonObject root = new JsonObject(); - root.add(INDEX, indexRoot); - final JestResult result = new JestResult(new Gson()); - result.setJsonObject(root); - final GetMapping getMapping = new GetMapping.Builder().addIndex(INDEX).addType(TYPE).build(); - when(jestClient.execute(getMapping)).thenReturn(result); - - assertThat(client.getMapping(INDEX, TYPE), is(equalTo(mapping))); - } - - @Test - public void executesBulk() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final BulkResult success = new BulkResult(new Gson()); - success.setSucceeded(true); - when(jestClient.execute(((JestBulkRequest) request).getBulk())).thenReturn(success); - - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(true))); - } - - @Test - public void executesBulkAndFails() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final BulkResult failure = new BulkResult(new Gson()); - failure.setSucceeded(false); - when(jestClient.execute(((JestBulkRequest) request).getBulk())).thenReturn(failure); - - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(false))); - } - - @Test - public void executesBulkAndFailsWithParseError() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final BulkResult failure = createBulkResultFailure(JestElasticsearchClient.MAPPER_PARSE_EXCEPTION); - when(jestClient.execute(((JestBulkRequest) request).getBulk())).thenReturn(failure); - - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(false))); - } - - @Test - public void executesBulkAndFailsWithSomeOtherError() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final BulkResult failure = createBulkResultFailure("some_random_exception"); - when(jestClient.execute(((JestBulkRequest) request).getBulk())).thenReturn(failure); - - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(false))); - } - - @Test - public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final BulkResult failure = createBulkResultFailure(JestElasticsearchClient.VERSION_CONFLICT_ENGINE_EXCEPTION); - when(jestClient.execute(((JestBulkRequest) request).getBulk())).thenReturn(failure); - - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(true))); - } - - @Test - public void searches() throws Exception { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - final Search search = new Search.Builder(QUERY).addIndex(INDEX).addType(TYPE).build(); - final JsonObject queryResult = new JsonObject(); - final SearchResult result = new SearchResult(new Gson()); - result.setJsonObject(queryResult); - when(jestClient.execute(search)).thenReturn(result); - - assertThat(client.search(QUERY, INDEX, TYPE), is(equalTo(queryResult))); - } - - @Test - public void closes() throws IOException { - final JestElasticsearchClient client = new JestElasticsearchClient(jestClient); - client.close(); - - verify(jestClient).close(); - } - - private BulkResult createBulkResultFailure(final String exception) { - final BulkResult failure = new BulkResult(new Gson()); - failure.setSucceeded(false); - final JsonObject error = new JsonObject(); - error.addProperty("type", exception); - final JsonObject item = new JsonObject(); - item.addProperty("_index", INDEX); - item.addProperty("_type", TYPE); - item.addProperty("status", 0); - item.add("error", error); - final JsonObject index = new JsonObject(); - index.add("index", item); - final JsonArray items = new JsonArray(); - items.add(index); - final JsonObject root = new JsonObject(); - root.add("items", items); - failure.setJsonObject(root); - return failure; - } -}