Skip to content

Commit

Permalink
feat: add ElasticSearch 8 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Mar 4, 2024
1 parent 44f6048 commit 1008ddd
Show file tree
Hide file tree
Showing 18 changed files with 885 additions and 925 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/create_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Setup Java SDK
uses: actions/setup-java@v1.4.3
uses: actions/setup-java@4
with:
distribution: "temurin"
java-version: 11

- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.commit_hash }}

Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/main_push_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up JDK 11
uses: actions/setup-java@v1
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: 11
- name: Build with Gradle
run: ./gradlew build
run: ./gradlew check
7 changes: 4 additions & 3 deletions .github/workflows/pull_request_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v1
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: 11
- name: Build with Gradle
run: ./gradlew build
run: ./gradlew check --info
2 changes: 1 addition & 1 deletion .github/workflows/release_pr_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
fi
- name: Checkout main
uses: actions/checkout@v2
uses: actions/checkout@v4
with:
ref: main
fetch-depth: 0
Expand Down
26 changes: 14 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,14 @@ publishing {
}

ext {
guavaVersion = "11.0.2"
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.36"
elasticSearchVersion = "2.4.1"
luceneVersion = "5.5.2"
jestVersion = "6.3.1"
slf4jVersion = "2.0.12"
log4jVersion = "2.23.0"
elasticSearchVersion = "7.4.0"
elasticClientVersion = "7.17.0"
testContainersElasticVersion = "1.19.6"
carrotsearchVersion = "2.8.1"
}

processResources {
Expand All @@ -141,7 +144,11 @@ dependencies {
compileOnly "org.apache.kafka:connect-json:$kafkaVersion"

implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "io.searchbox:jest:$jestVersion"
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion"
implementation("com.google.guava:guava:$guavaVersion")

implementation "org.apache.logging.log4j:log4j-api:$log4jVersion"
implementation "org.apache.logging.log4j:log4j-core:$log4jVersion"

testImplementation("junit:junit:4.13.2") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
Expand All @@ -152,13 +159,8 @@ dependencies {

testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
testImplementation "org.apache.kafka:connect-json:$kafkaVersion"
testImplementation "org.apache.lucene:lucene-test-framework:$luceneVersion"
testImplementation "com.fasterxml.jackson.core:jackson-core:2.15.2"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.15.2"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.15.2"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion:tests"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion"
testImplementation "org.apache.lucene:lucene-expressions:$luceneVersion"
testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion"
implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/>

<suppress
checks="(ClassFanOutComplexity)" files="(JestElasticsearchClient|JestElasticsearchClientTest).java"
checks="(ClassFanOutComplexity)" files="(AivenElasticsearchClientWrapper|AivenElasticsearchClientWrapperTest|BulkProcessor|ElasticsearchSinkTask|DataConverter).java"
/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;

import com.google.gson.JsonObject;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;

public interface ElasticsearchClient extends AutoCloseable {

enum Version {
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7, ES_V8
}

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ enum Version {
* @param type the type
* @throws IOException if the client cannot execute the request
*/
JsonObject getMapping(String index, String type) throws IOException;
MappingMetadata getMapping(String index, String type) throws IOException;

/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
Expand All @@ -87,13 +88,18 @@ enum Version {
/**
* Executes a search.
*
* @param query the search query
* @param index the index to search
* @param type the type to search
* @return the search result
* @throws IOException if the client cannot execute the request
*/
JsonObject search(String query, String index, String type) throws IOException;
SearchResponse search(String index) throws IOException;

/**
* Executes a search.
*
* @param index the index to refresh
*/
void refresh(String index) throws IOException;

/**
* Shuts down the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.connect.sink.SinkTask;

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
import io.aiven.connect.elasticsearch.jest.JestElasticsearchClient;
import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,7 +121,7 @@ public void start(final Map<String, String> props, final ElasticsearchClient cli
if (client != null) {
this.client = client;
} else {
this.client = new JestElasticsearchClient(props);
this.client = new AivenElasticsearchClientWrapper(config);
}

final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
Expand Down Expand Up @@ -178,6 +178,10 @@ public void close(final Collection<TopicPartition> partitions) {
log.debug("Closing the task for topic partitions: {}", partitions);
}

public void refresh(final String index) throws IOException {
client.refresh(index);
}

@Override
public void stop() throws ConnectException {
log.info("Stopping ElasticsearchSinkTask.");
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/io/aiven/connect/elasticsearch/Mapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonObject;
import org.elasticsearch.cluster.metadata.MappingMetadata;

public class Mapping {

Expand All @@ -57,7 +57,7 @@ public static void createMapping(
/**
* Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
*/
public static JsonObject getMapping(final ElasticsearchClient client, final String index, final String type)
public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type)
throws IOException {
return client.getMapping(index, type);
}
Expand All @@ -67,7 +67,7 @@ public static JsonObject getMapping(final ElasticsearchClient client, final Stri
*
* @param schema The schema used to infer mapping.
*/
public static JsonNode inferMapping(final ElasticsearchClient client, final Schema schema) {
public static JsonNode inferMapping(final ElasticsearchClient.Version version, final Schema schema) {
if (schema == null) {
throw new DataException("Cannot infer mapping without schema.");
}
Expand All @@ -83,26 +83,26 @@ public static JsonNode inferMapping(final ElasticsearchClient client, final Sche
final ObjectNode fields = JsonNodeFactory.instance.objectNode();
switch (schemaType) {
case ARRAY:
return inferMapping(client, schema.valueSchema());
return inferMapping(version, schema.valueSchema());
case MAP:
properties.set("properties", fields);
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(client, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(client, schema.valueSchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(version, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(version, schema.valueSchema()));
return properties;
case STRUCT:
properties.set("properties", fields);
for (final Field field : schema.fields()) {
fields.set(field.name(), inferMapping(client, field.schema()));
fields.set(field.name(), inferMapping(version, field.schema()));
}
return properties;
default:
final String esType = getElasticsearchType(client, schemaType);
final String esType = getElasticsearchType(version, schemaType);
return inferPrimitive(esType, schema.defaultValue());
}
}

// visible for testing
protected static String getElasticsearchType(final ElasticsearchClient client,
protected static String getElasticsearchType(final ElasticsearchClient.Version version,
final Schema.Type schemaType) {
switch (schemaType) {
case BOOLEAN:
Expand All @@ -120,7 +120,7 @@ protected static String getElasticsearchType(final ElasticsearchClient client,
case FLOAT64:
return ElasticsearchSinkConnectorConstants.DOUBLE_TYPE;
case STRING:
switch (client.getVersion()) {
switch (version) {
case ES_V1:
case ES_V2:
return ElasticsearchSinkConnectorConstants.STRING_TYPE;
Expand Down
Loading

0 comments on commit 1008ddd

Please sign in to comment.