Skip to content

Commit

Permalink
Added back elasticsearch tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Sep 5, 2023
1 parent 9016fb7 commit 405eb3c
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 0 deletions.
73 changes: 73 additions & 0 deletions tests/itests-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>itests-parent</artifactId>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../itests-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>itests-elasticsearch</artifactId>
<name>Camel-Kafka-Connector :: Tests :: Elasticsearch</name>

<dependencies>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>itests-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- test infra -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-infra-common</artifactId>
<version>${camel.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-infra-elasticsearch</artifactId>
<version>${camel.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch-java-client-version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.elasticsearch.clients;

import java.io.IOException;
import java.util.List;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ElasticSearchClient {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class);

private final ElasticsearchClient client;
private final String index;

public ElasticSearchClient(String host, int port, String index) {

final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ElasticSearchCommon.USERNAME, ElasticSearchCommon.PASSWORD));

RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});

RestClient httpClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(
httpClient,
new JacksonJsonpMapper()
);

client = new ElasticsearchClient(transport);
this.index = index;
}

public boolean indexExists() {
try {
ExistsRequest indexRequest = new ExistsRequest.Builder().index(index).build();

return client.indices().exists(indexRequest).value();
} catch (IOException e) {
/*
It may return if failed to parse the response, on timeout or no response from the ES instance.
Assuming it is more likely to timeout or provide no reply either the during the start up or
on overloaded CI environments, we log the I/O error and try again
*/
LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
}

return false;
}

public List<Hit<ObjectNode>> getData() {
try {
SearchResponse<ObjectNode> response = client.search(s ->
s.index(index)
.query(QueryBuilders.matchAll().build()._toQuery()),
ObjectNode.class);

return response.hits().hits();
} catch (IOException e) {
/*
It may return if failed to parse the response, on timeout or no response from the ES instance.
Assuming it is more likely to timeout or provide no reply either the during the start up or
on overloaded CI environments, we log the I/O error and try again
*/
LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e);
} catch (Throwable e) {
LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e);
}

return null;
}

private boolean hasData(int expect) {
List<Hit<ObjectNode>> searchHits = getData();

if (searchHits == null) {
LOG.debug("There are not search hit to return");

return false;
}

int count = searchHits.size();

if (count != expect) {
LOG.debug("Not enough records: {} available, but {} expected", count, expect);

return false;
}

return true;
}

public void waitForIndex() {
TestUtils.waitFor(this::indexExists);
}

public void waitForData(int expect) {
TestUtils.waitFor(this::hasData, expect);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.elasticsearch.common;

import org.testcontainers.elasticsearch.ElasticsearchContainer;

public final class ElasticSearchCommon {
/**
* The default ElasticSearch cluster name for usage during the tests
*/
public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster";

/**
* The default ElasticSearch index for usage during the tests
*/
public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index";

/**
* The default ElasticSearch container username
*/
public static final String USERNAME = "elastic";

/**
* The default ElasticSearch container password
*/
public static final String PASSWORD = ElasticsearchContainer.ELASTICSEARCH_DEFAULT_PASSWORD;

private ElasticSearchCommon() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.elasticsearch.common;

import java.util.Collections;
import java.util.Map;

import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;

public class ElasticSearchIndexMessageProducer extends AbstractTestMessageProducer<String> {

public ElasticSearchIndexMessageProducer(String bootStrapServer, String topicName, int count) {
super(bootStrapServer, topicName, count);
}

public ElasticSearchIndexMessageProducer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
super(kafkaClient, topicName, count);
}

@Override
public Map<String, String> messageHeaders(String text, int current) {
return Collections.singletonMap(CamelSinkTask.HEADER_CAMEL_PREFIX + "indexId", String.valueOf(current));
}

@Override
public String testMessageContent(int current) {
return "{\n"
+ " \"tags\": [\n"
+ " \"opster\",\n"
+ " \"elasticsearch\"\n"
+ " ],\n"
+ " \"date\": \"01-01-2020\",\n"
+ " \"counter\": \"" + current + "\"\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.elasticsearch.common;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;

import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchLocalContainerService;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearchLocalContainerServiceHack extends ElasticSearchLocalContainerService {
public ElasticSearchLocalContainerServiceHack() {
super();
}

public ElasticSearchLocalContainerServiceHack(String imageName) {
super(imageName);
}

public ElasticSearchLocalContainerServiceHack(ElasticsearchContainer container) {
super(container);
}

@Override
public void registerProperties() {
System.setProperty("elasticsearch.host", this.getElasticSearchHost());
System.setProperty("elasticsearch.port", String.valueOf(this.getPort()));
this.getContainer().caCertAsBytes().ifPresent(content -> {
try {
Field certPath = getClass().getSuperclass().getDeclaredField("certPath");
certPath.setAccessible(true); // enables access to private variables
certPath.set(this, Files.createTempFile("http_ca", ".crt"));
Files.write((Path) certPath.get(this), content, new OpenOption[0]);

Field sslContext = getClass().getSuperclass().getDeclaredField("sslContext");
sslContext.setAccessible(true); // enables access to private variables
sslContext.set(this, this.getContainer().createSslContextFromCa());
} catch (IOException | NoSuchFieldException | IllegalAccessException var3) {
throw new RuntimeException(var3);
}
});
}
}
Loading

0 comments on commit 405eb3c

Please sign in to comment.