Skip to content

Commit

Permalink
feat: use Elasticsearch Java Client API
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Mar 18, 2024
1 parent 3b84d52 commit 9dc3dfc
Show file tree
Hide file tree
Showing 15 changed files with 633 additions and 578 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ The project originates from Confluent [kafka-connect-elasticsearch](https://gith

# Documentation

TBD
Supported Elasticsearch versions are 7.17.0+

# Contribute

[Source Code](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
[Source Code](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)

[Issue Tracker](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
[Issue Tracker](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)

# License

Expand Down
53 changes: 22 additions & 31 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ java {
}

wrapper {
distributionType = 'ALL'
distributionType = "ALL"
doLast {
def sha256Sum = new String(new URL("${distributionUrl}.sha256").bytes)
propertiesFile << "distributionSha256Sum=${sha256Sum}\n"
Expand All @@ -57,18 +57,16 @@ wrapper {
}

ext {
guavaVersion = "11.0.2"
kafkaVersion = "2.2.0"
slf4jVersion = "2.0.12"
log4jVersion = "2.23.0"
elasticSearchVersion = "7.4.0"
elasticClientVersion = "7.17.0"
elasticJavaClientVersion = "7.17.18"
testContainersElasticVersion = "1.19.6"
carrotsearchVersion = "2.8.1"
}

compileJava {
options.compilerArgs = ['-Xlint:all', '-Werror']
options.compilerArgs = ["-Xlint:all", "-Werror"]
}

checkstyle {
Expand All @@ -81,28 +79,23 @@ jacoco {
}

dependencies {
compileOnly "org.apache.kafka:connect-api:$kafkaVersion"
compileOnly "org.apache.kafka:connect-json:$kafkaVersion"
compileOnly("org.apache.kafka:connect-api:$kafkaVersion")
compileOnly("org.apache.kafka:connect-json:$kafkaVersion")

implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion"
implementation("com.google.guava:guava:$guavaVersion")

implementation "org.apache.logging.log4j:log4j-api:$log4jVersion"
implementation "org.apache.logging.log4j:log4j-core:$log4jVersion"
implementation("org.slf4j:slf4j-api:$slf4jVersion")
implementation("co.elastic.clients:elasticsearch-java:$elasticJavaClientVersion")

testImplementation("junit:junit:4.13.2") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
exclude(group: "org.hamcrest", module: "hamcrest-core")
}
testImplementation "org.hamcrest:hamcrest-all:1.3"
testImplementation "org.mockito:mockito-core:5.4.0"
testImplementation "org.mockito:mockito-all:1.10.19"

testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
testImplementation "org.apache.kafka:connect-json:$kafkaVersion"
testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion"
implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation("org.mockito:mockito-core:5.4.0")
testImplementation("org.mockito:mockito-all:1.10.19")

testImplementation("org.apache.kafka:connect-json:$kafkaVersion")
testImplementation("org.testcontainers:elasticsearch:$testContainersElasticVersion")
testImplementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
testRuntimeOnly("org.slf4j:slf4j-log4j12:$slf4jVersion")
}

distributions {
Expand Down Expand Up @@ -156,7 +149,7 @@ publishing {
}

processResources {
filesMatching('elasticsearch-connector-for-apache-kafka-version.properties') {
filesMatching("elasticsearch-connector-for-apache-kafka-version.properties") {
expand(version: version)
}
}
Expand All @@ -169,12 +162,10 @@ jar {
}
}

test {

//we do not need to check classpath hell for testing
systemProperty "tests.jarhell.check", "false"

//tests.security.manager is true all ElasticsearchSinkTestBase aware test hang
systemProperty "tests.security.manager", "false"
def elasticsearch7Test = tasks.register("elasticsearch7Test", Test) {
environment("ELASTIC_TEST_CONTAINER_VERSION", "7.17.0")
}

tasks.named("check") {
dependsOn elasticsearch7Test
}
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/>

<suppress
checks="(ClassFanOutComplexity)" files="(AivenElasticsearchClientWrapper|AivenElasticsearchClientWrapperTest|BulkProcessor|ElasticsearchSinkTask|DataConverter).java"
checks="(ClassFanOutComplexity)" files="(ElasticsearchClientWrapper|ElasticsearchClientWrapperTest|BulkProcessor|ElasticsearchSinkTask|DataConverter).java"
/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import com.fasterxml.jackson.databind.JsonNode;

public interface ElasticsearchClient extends AutoCloseable {

Expand Down Expand Up @@ -66,7 +67,7 @@ enum Version {
* @param type the type
* @throws IOException if the client cannot execute the request
*/
MappingMetadata getMapping(String index, String type) throws IOException;
Property getMapping(String index, String type) throws IOException;

/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
Expand All @@ -92,14 +93,14 @@ enum Version {
* @return the search result
* @throws IOException if the client cannot execute the request
*/
SearchResponse search(String index) throws IOException;
SearchResponse<JsonNode> search(String index) throws IOException;

/**
* Executes a search.
* Refreshes the index.
*
* @param index the index to refresh
*/
void refresh(String index) throws IOException;
void refreshIndex(String index) throws IOException;

/**
* Shuts down the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.connect.sink.SinkTask;

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper;
import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +55,7 @@ public void start(final Map<String, String> props) {
start(props, null);
}

@SuppressWarnings("deprecation")
@SuppressWarnings("deprecation") //TOPIC_INDEX_MAP_CONFIG
// public for testing
public void start(final Map<String, String> props, final ElasticsearchClient client) {
try {
Expand Down Expand Up @@ -121,7 +121,7 @@ public void start(final Map<String, String> props, final ElasticsearchClient cli
if (client != null) {
this.client = client;
} else {
this.client = new AivenElasticsearchClientWrapper(config);
this.client = new ElasticsearchClientWrapper(config);
}

final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
Expand Down Expand Up @@ -179,7 +179,7 @@ public void close(final Collection<TopicPartition> partitions) {
}

public void refresh(final String index) throws IOException {
client.refresh(index);
client.refreshIndex(index);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -31,7 +32,6 @@

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;

import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,7 +103,7 @@ public class ElasticsearchWriter {
behaviorOnMalformedDoc
);

existingMappings = Sets.newHashSet();
existingMappings = new HashSet<>();
}

public static class Builder {
Expand Down Expand Up @@ -349,7 +349,7 @@ public void createIndicesForTopics(final Set<String> assignedTopics) {
}

private Set<String> indicesForTopics(final Set<String> assignedTopics) {
final Set<String> indices = Sets.newHashSet();
final Set<String> indices = new HashSet<>();
for (final String topic : assignedTopics) {
indices.add(convertTopicToIndexName(topic));
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/aiven/connect/elasticsearch/Mapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;

import co.elastic.clients.elasticsearch._types.mapping.Property;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.elasticsearch.cluster.metadata.MappingMetadata;

public class Mapping {

Expand All @@ -57,7 +57,7 @@ public static void createMapping(
/**
* Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
*/
public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type)
public static Property getMapping(final ElasticsearchClient client, final String index, final String type)
throws IOException {
return client.getMapping(index, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

import io.aiven.connect.elasticsearch.bulk.BulkRequest;

public class BulkRequestImpl implements BulkRequest {
private final org.elasticsearch.action.bulk.BulkRequest bulkRequest;
public class ElasticsearchBulkRequest implements BulkRequest {
private final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest;

public BulkRequestImpl(final org.elasticsearch.action.bulk.BulkRequest bulkRequest) {
public ElasticsearchBulkRequest(final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
}

public org.elasticsearch.action.bulk.BulkRequest getBulkRequest() {
public co.elastic.clients.elasticsearch.core.BulkRequest getBulkRequest() {
return bulkRequest;
}
}
Loading

0 comments on commit 9dc3dfc

Please sign in to comment.