Skip to content

Commit

Permalink
feat: add ElasticSearch 8 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Apr 10, 2024
1 parent 2104e7f commit f1d3715
Show file tree
Hide file tree
Showing 15 changed files with 904 additions and 923 deletions.
27 changes: 14 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ wrapper {
}

ext {
guavaVersion = "11.0.2"
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.36"
elasticSearchVersion = "2.4.1"
luceneVersion = "5.5.2"
jestVersion = "6.3.1"
slf4jVersion = "2.0.12"
log4jVersion = "2.23.0"
elasticSearchVersion = "7.4.0"
elasticClientVersion = "7.17.0"
testContainersElasticVersion = "1.19.6"
carrotsearchVersion = "2.8.1"
}

compileJava {
Expand All @@ -82,7 +85,11 @@ dependencies {
compileOnly "org.apache.kafka:connect-json:$kafkaVersion"

implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "io.searchbox:jest:$jestVersion"
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion"
implementation("com.google.guava:guava:$guavaVersion")

implementation "org.apache.logging.log4j:log4j-api:$log4jVersion"
implementation "org.apache.logging.log4j:log4j-core:$log4jVersion"

testImplementation("junit:junit:4.13.2") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
Expand All @@ -93,13 +100,8 @@ dependencies {

testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
testImplementation "org.apache.kafka:connect-json:$kafkaVersion"
testImplementation "org.apache.lucene:lucene-test-framework:$luceneVersion"
testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion:tests"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion"
testImplementation "org.apache.lucene:lucene-expressions:$luceneVersion"
testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion"
implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down Expand Up @@ -167,7 +169,6 @@ jar {
}
}


test {

//we do not need to check classpath hell for testing
Expand Down
6 changes: 3 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
<!-- switch statements on types exceed maximum complexity -->
<suppress
checks="(CyclomaticComplexity)"
files="(JestElasticsearchClient|Mapping|JestElasticsearchClient).java"
files="(Mapping).java"
/>

<!-- TODO: Undecided if this is too much -->
<suppress
checks="(ClassDataAbstractionCoupling)"
files="(BulkProcessor|JestElasticsearchClient).java"
files="(BulkProcessor).java"
/>

<!-- TODO: Pass some parameters in common config object? -->
Expand All @@ -25,7 +25,7 @@
/>

<suppress
checks="(ClassFanOutComplexity)" files="(JestElasticsearchClient|JestElasticsearchClientTest).java"
checks="(ClassFanOutComplexity)" files="(AivenElasticsearchClientWrapper|AivenElasticsearchClientWrapperTest|BulkProcessor|ElasticsearchSinkTask|DataConverter).java"
/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;

import com.google.gson.JsonObject;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;

public interface ElasticsearchClient extends AutoCloseable {

enum Version {
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7, ES_V8
}

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ enum Version {
* @param type the type
* @throws IOException if the client cannot execute the request
*/
JsonObject getMapping(String index, String type) throws IOException;
MappingMetadata getMapping(String index, String type) throws IOException;

/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
Expand All @@ -87,13 +88,18 @@ enum Version {
/**
* Executes a search.
*
* @param query the search query
* @param index the index to search
* @param type the type to search
* @return the search result
* @throws IOException if the client cannot execute the request
*/
JsonObject search(String query, String index, String type) throws IOException;
SearchResponse search(String index) throws IOException;

/**
* Executes a search.
*
* @param index the index to refresh
*/
void refresh(String index) throws IOException;

/**
* Shuts down the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.connect.sink.SinkTask;

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
import io.aiven.connect.elasticsearch.jest.JestElasticsearchClient;
import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,7 +121,7 @@ public void start(final Map<String, String> props, final ElasticsearchClient cli
if (client != null) {
this.client = client;
} else {
this.client = new JestElasticsearchClient(props);
this.client = new AivenElasticsearchClientWrapper(config);
}

final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
Expand Down Expand Up @@ -178,6 +178,10 @@ public void close(final Collection<TopicPartition> partitions) {
log.debug("Closing the task for topic partitions: {}", partitions);
}

public void refresh(final String index) throws IOException {
client.refresh(index);
}

@Override
public void stop() throws ConnectException {
log.info("Stopping ElasticsearchSinkTask.");
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/aiven/connect/elasticsearch/Mapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonObject;
import org.elasticsearch.cluster.metadata.MappingMetadata;

public class Mapping {

Expand All @@ -43,7 +43,7 @@ public class Mapping {
* @param index The index to write to Elasticsearch.
* @param type The type to create mapping for.
* @param schema The schema used to infer mapping.
* @throws IOException from underlying JestClient
* @throws IOException from underlying client
*/
public static void createMapping(
final ElasticsearchClient client,
Expand All @@ -57,7 +57,7 @@ public static void createMapping(
/**
* Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
*/
public static JsonObject getMapping(final ElasticsearchClient client, final String index, final String type)
public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type)
throws IOException {
return client.getMapping(index, type);
}
Expand All @@ -67,7 +67,7 @@ public static JsonObject getMapping(final ElasticsearchClient client, final Stri
*
* @param schema The schema used to infer mapping.
*/
public static JsonNode inferMapping(final ElasticsearchClient client, final Schema schema) {
public static JsonNode inferMapping(final ElasticsearchClient.Version version, final Schema schema) {
if (schema == null) {
throw new DataException("Cannot infer mapping without schema.");
}
Expand All @@ -83,26 +83,26 @@ public static JsonNode inferMapping(final ElasticsearchClient client, final Sche
final ObjectNode fields = JsonNodeFactory.instance.objectNode();
switch (schemaType) {
case ARRAY:
return inferMapping(client, schema.valueSchema());
return inferMapping(version, schema.valueSchema());
case MAP:
properties.set("properties", fields);
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(client, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(client, schema.valueSchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(version, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(version, schema.valueSchema()));
return properties;
case STRUCT:
properties.set("properties", fields);
for (final Field field : schema.fields()) {
fields.set(field.name(), inferMapping(client, field.schema()));
fields.set(field.name(), inferMapping(version, field.schema()));
}
return properties;
default:
final String esType = getElasticsearchType(client, schemaType);
final String esType = getElasticsearchType(version, schemaType);
return inferPrimitive(esType, schema.defaultValue());
}
}

// visible for testing
protected static String getElasticsearchType(final ElasticsearchClient client,
protected static String getElasticsearchType(final ElasticsearchClient.Version version,
final Schema.Type schemaType) {
switch (schemaType) {
case BOOLEAN:
Expand All @@ -120,7 +120,7 @@ protected static String getElasticsearchType(final ElasticsearchClient client,
case FLOAT64:
return ElasticsearchSinkConnectorConstants.DOUBLE_TYPE;
case STRING:
switch (client.getVersion()) {
switch (version) {
case ES_V1:
case ES_V2:
return ElasticsearchSinkConnectorConstants.STRING_TYPE;
Expand Down
Loading

0 comments on commit f1d3715

Please sign in to comment.