diff --git a/tests/itests-elasticsearch/pom.xml b/tests/itests-elasticsearch/pom.xml
new file mode 100644
index 0000000000..0f6be679bd
--- /dev/null
+++ b/tests/itests-elasticsearch/pom.xml
@@ -0,0 +1,73 @@
+ org.apache.camel.kafkaconnector
+ itests-parent
+ 4.0.0-SNAPSHOT
+ ../itests-parent/pom.xml
+ 4.0.0
+ itests-elasticsearch
+ Camel-Kafka-Connector :: Tests :: Elasticsearch
+ org.apache.camel.kafkaconnector
+ itests-common
+ ${project.version}
+ test-jar
+ test
+ org.apache.camel
+ camel-test-infra-common
+ ${camel.version}
+ test-jar
+ test
+ org.apache.camel
+ camel-test-infra-elasticsearch
+ ${camel.version}
+ test-jar
+ test
+ co.elastic.clients
+ elasticsearch-java
+ ${elasticsearch-java-client-version}
+ org.apache.camel
+ camel-elasticsearch
+ org.apache.camel
+ camel-bean
+ org.apache.camel
+ camel-jackson
\ No newline at end of file
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/clients/ElasticSearchClient.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/clients/ElasticSearchClient.java
new file mode 100644
index 0000000000..9a8d0eb8c8
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/clients/ElasticSearchClient.java
@@ -0,0 +1,146 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.clients;
+import java.io.IOException;
+import java.util.List;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
+import org.apache.camel.test.infra.common.TestUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class ElasticSearchClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class);
+ private final ElasticsearchClient client;
+ private final String index;
+ public ElasticSearchClient(String host, int port, String index) {
+ final CredentialsProvider credentialsProvider =
+ new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(ElasticSearchCommon.USERNAME, ElasticSearchCommon.PASSWORD));
+ RestClientBuilder builder = RestClient.builder(
+ new HttpHost(host, port, "http"))
+ .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+ @Override
+ public HttpAsyncClientBuilder customizeHttpClient(
+ HttpAsyncClientBuilder httpClientBuilder) {
+ return httpClientBuilder
+ .setDefaultCredentialsProvider(credentialsProvider);
+ }
+ });
+ RestClient httpClient = builder.build();
+ ElasticsearchTransport transport = new RestClientTransport(
+ httpClient,
+ new JacksonJsonpMapper()
+ );
+ client = new ElasticsearchClient(transport);
+ this.index = index;
+ }
+ public boolean indexExists() {
+ try {
+ ExistsRequest indexRequest = new ExistsRequest.Builder().index(index).build();
+ return client.indices().exists(indexRequest).value();
+ } catch (IOException e) {
+ /*
+ It may return if failed to parse the response, on timeout or no response from the ES instance.
+ Assuming it is more likely to timeout or provide no reply either the during the start up or
+ on overloaded CI environments, we log the I/O error and try again
+ */
+ LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+ }
+ return false;
+ }
+ public List> getData() {
+ try {
+ SearchResponse response = client.search(s ->
+ s.index(index)
+ .query(QueryBuilders.matchAll().build()._toQuery()),
+ ObjectNode.class);
+ return response.hits().hits();
+ } catch (IOException e) {
+ /*
+ It may return if failed to parse the response, on timeout or no response from the ES instance.
+ Assuming it is more likely to timeout or provide no reply either the during the start up or
+ on overloaded CI environments, we log the I/O error and try again
+ */
+ LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
+ } catch (Throwable e) {
+ LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e);
+ }
+ return null;
+ }
+ private boolean hasData(int expect) {
+ List> searchHits = getData();
+ if (searchHits == null) {
+ LOG.debug("There are not search hit to return");
+ return false;
+ }
+ int count = searchHits.size();
+ if (count != expect) {
+ LOG.debug("Not enough records: {} available, but {} expected", count, expect);
+ return false;
+ }
+ return true;
+ }
+ public void waitForIndex() {
+ TestUtils.waitFor(this::indexExists);
+ }
+ public void waitForData(int expect) {
+ TestUtils.waitFor(this::hasData, expect);
+ }
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java
new file mode 100644
index 0000000000..d12a1b01f2
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java
@@ -0,0 +1,45 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.common;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+public final class ElasticSearchCommon {
+ /**
+ * The default ElasticSearch cluster name for usage during the tests
+ */
+ public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster";
+ /**
+ * The default ElasticSearch index for usage during the tests
+ */
+ public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index";
+ /**
+ * The default ElasticSearch container username
+ */
+ public static final String USERNAME = "elastic";
+ /**
+ * The default ElasticSearch container password
+ */
+ public static final String PASSWORD = ElasticsearchContainer.ELASTICSEARCH_DEFAULT_PASSWORD;
+ private ElasticSearchCommon() {
+ }
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchIndexMessageProducer.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchIndexMessageProducer.java
new file mode 100644
index 0000000000..41c8e55e58
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchIndexMessageProducer.java
@@ -0,0 +1,52 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.common;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;
+public class ElasticSearchIndexMessageProducer extends AbstractTestMessageProducer {
+ public ElasticSearchIndexMessageProducer(String bootStrapServer, String topicName, int count) {
+ super(bootStrapServer, topicName, count);
+ }
+ public ElasticSearchIndexMessageProducer(KafkaClient kafkaClient, String topicName, int count) {
+ super(kafkaClient, topicName, count);
+ }
+ @Override
+ public Map messageHeaders(String text, int current) {
+ return Collections.singletonMap(CamelSinkTask.HEADER_CAMEL_PREFIX + "indexId", String.valueOf(current));
+ }
+ @Override
+ public String testMessageContent(int current) {
+ return "{\n"
+ + " \"tags\": [\n"
+ + " \"opster\",\n"
+ + " \"elasticsearch\"\n"
+ + " ],\n"
+ + " \"date\": \"01-01-2020\",\n"
+ + " \"counter\": \"" + current + "\"\n"
+ + "}";
+ }
\ No newline at end of file
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchLocalContainerServiceHack.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchLocalContainerServiceHack.java
new file mode 100644
index 0000000000..52568694b0
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchLocalContainerServiceHack.java
@@ -0,0 +1,60 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.common;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchLocalContainerService;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+public class ElasticSearchLocalContainerServiceHack extends ElasticSearchLocalContainerService {
+ public ElasticSearchLocalContainerServiceHack() {
+ super();
+ }
+ public ElasticSearchLocalContainerServiceHack(String imageName) {
+ super(imageName);
+ }
+ public ElasticSearchLocalContainerServiceHack(ElasticsearchContainer container) {
+ super(container);
+ }
+ @Override
+ public void registerProperties() {
+ System.setProperty("elasticsearch.host", this.getElasticSearchHost());
+ System.setProperty("elasticsearch.port", String.valueOf(this.getPort()));
+ this.getContainer().caCertAsBytes().ifPresent(content -> {
+ try {
+ Field certPath = getClass().getSuperclass().getDeclaredField("certPath");
+ certPath.setAccessible(true); // enables access to private variables
+ certPath.set(this, Files.createTempFile("http_ca", ".crt"));
+ Files.write((Path) certPath.get(this), content, new OpenOption[0]);
+ Field sslContext = getClass().getSuperclass().getDeclaredField("sslContext");
+ sslContext.setAccessible(true); // enables access to private variables
+ sslContext.set(this, this.getContainer().createSslContextFromCa());
+ } catch (IOException | NoSuchFieldException | IllegalAccessException var3) {
+ throw new RuntimeException(var3);
+ }
+ });
+ }
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelElasticSearchPropertyFactory.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelElasticSearchPropertyFactory.java
new file mode 100644
index 0000000000..46f3fac937
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelElasticSearchPropertyFactory.java
@@ -0,0 +1,55 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.sink;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
+final class CamelElasticSearchPropertyFactory extends SinkConnectorPropertyFactory {
+ private CamelElasticSearchPropertyFactory() {
+ }
+ public CamelElasticSearchPropertyFactory withClusterName(String clusterName) {
+ return setProperty("camel.kamelet.elasticsearch-index-sink.clusterName", clusterName);
+ }
+ public CamelElasticSearchPropertyFactory withHostAddress(String hostAddress) {
+ return setProperty("camel.kamelet.elasticsearch-index-sink.hostAddresses", hostAddress);
+ }
+ public CamelElasticSearchPropertyFactory withIndexName(String indexName) {
+ return setProperty("camel.kamelet.elasticsearch-index-sink.indexName", indexName);
+ }
+ public static CamelElasticSearchPropertyFactory basic() {
+ return new CamelElasticSearchPropertyFactory()
+ .withName("CamelElasticSearchSinkConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.elasticsearchindexsink.CamelElasticsearchindexsinkSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .setProperty("camel.kamelet.elasticsearch-index-sink.user", ElasticSearchCommon.USERNAME)
+ .setProperty("camel.kamelet.elasticsearch-index-sink.password", ElasticSearchCommon.PASSWORD)
+ .setProperty("camel.kamelet.elasticsearch-index-sink.enableSSL", "false")
+ .setProperty("camel.component.kamelet.location", "kamelets")
+ .setProperty("camel.component.properties.environment-variable-mode", "1");
+ }
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
new file mode 100644
index 0000000000..082389c944
--- /dev/null
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -0,0 +1,137 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.elasticsearch.sink;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.elasticsearch.clients.ElasticSearchClient;
+import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
+import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchIndexMessageProducer;
+import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchLocalContainerServiceHack;
+import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchService;
+import org.apache.camel.test.infra.elasticsearch.services.RemoteElasticSearchService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import static org.apache.camel.test.infra.elasticsearch.services.ElasticSearchServiceFactory.builder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
+ @RegisterExtension
+ public static ElasticSearchService elasticSearch = builder()
+ .addLocalMapping(new Supplier() {
+ @Override
+ public ElasticSearchService get() {
+ ElasticsearchContainer container =
+ new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.5.2");
+ container.addEnv("xpack.security.enabled", "true");
+ //XXX: revert back to the normal lasticSearchLocalContainerService when https://issues.apache.org/jira/browse/CAMEL-19834 is fixed
+ return new ElasticSearchLocalContainerServiceHack(container);
+ }
+ }
+ ).addRemoteMapping(RemoteElasticSearchService::new).build();
+ private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
+ private ElasticSearchClient client;
+ private String topicName;
+ private final int expect = 10;
+ private int received;
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-elasticsearch-index-sink-kafka-connector"};
+ }
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
+ client = new ElasticSearchClient(elasticSearch.getElasticSearchHost(), elasticSearch.getPort(),
+ received = 0;
+ }
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ client.waitForIndex();
+ LOG.debug("Waiting for data");
+ client.waitForData(expect);
+ } finally {
+ latch.countDown();
+ }
+ }
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ List> hits = client.getData();
+ assertNotNull(hits);
+ hits.forEach(this::verifyHit);
+ assertEquals(expect, received,
+ "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+ private void verifyHit(Hit searchHit) {
+ ObjectNode source = searchHit.source();
+ LOG.debug("Search hit: {} ", source);
+ assertNotNull(source);
+ assertFalse(source.isEmpty());
+ assertEquals(String.valueOf(received), source.at("/counter").asText());
+ received++;
+ }
+ @Test
+ @Timeout(90)
+ public void testIndexOperation() throws Exception {
+ ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+ .withHostAddress(elasticSearch.getHttpHostAddress())
+ .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX);
+ runTest(propertyFactory, new ElasticSearchIndexMessageProducer(getKafkaService().getBootstrapServers(), topicName, expect));
+ }
diff --git a/tests/pom.xml b/tests/pom.xml
index 0790bd958b..62880b5404 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -45,6 +45,7 @@
+ itests-elasticsearch