-
Notifications
You must be signed in to change notification settings - Fork 102
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
569 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<groupId>org.apache.camel.kafkaconnector</groupId> | ||
<artifactId>itests-parent</artifactId> | ||
<version>4.0.0-SNAPSHOT</version> | ||
<relativePath>../itests-parent/pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>itests-elasticsearch</artifactId> | ||
<name>Camel-Kafka-Connector :: Tests :: Elasticsearch</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.camel.kafkaconnector</groupId> | ||
<artifactId>itests-common</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<!-- test infra --> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-test-infra-common</artifactId> | ||
<version>${camel.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-test-infra-elasticsearch</artifactId> | ||
<version>${camel.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>co.elastic.clients</groupId> | ||
<artifactId>elasticsearch-java</artifactId> | ||
<version>${elasticsearch-java-client-version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-elasticsearch</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-bean</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-jackson</artifactId> | ||
</dependency> | ||
</dependencies> | ||
</project> |
146 changes: 146 additions & 0 deletions
146
.../test/java/org/apache/camel/kafkaconnector/elasticsearch/clients/ElasticSearchClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.camel.kafkaconnector.elasticsearch.clients; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
import co.elastic.clients.elasticsearch.ElasticsearchClient; | ||
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; | ||
import co.elastic.clients.elasticsearch.core.SearchResponse; | ||
import co.elastic.clients.elasticsearch.core.search.Hit; | ||
import co.elastic.clients.elasticsearch.indices.ExistsRequest; | ||
import co.elastic.clients.json.jackson.JacksonJsonpMapper; | ||
import co.elastic.clients.transport.ElasticsearchTransport; | ||
import co.elastic.clients.transport.rest_client.RestClientTransport; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon; | ||
import org.apache.camel.test.infra.common.TestUtils; | ||
import org.apache.http.HttpHost; | ||
import org.apache.http.auth.AuthScope; | ||
import org.apache.http.auth.UsernamePasswordCredentials; | ||
import org.apache.http.client.CredentialsProvider; | ||
import org.apache.http.impl.client.BasicCredentialsProvider; | ||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; | ||
import org.elasticsearch.client.RestClient; | ||
import org.elasticsearch.client.RestClientBuilder; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class ElasticSearchClient { | ||
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class); | ||
|
||
private final ElasticsearchClient client; | ||
private final String index; | ||
|
||
public ElasticSearchClient(String host, int port, String index) { | ||
|
||
final CredentialsProvider credentialsProvider = | ||
new BasicCredentialsProvider(); | ||
credentialsProvider.setCredentials(AuthScope.ANY, | ||
new UsernamePasswordCredentials(ElasticSearchCommon.USERNAME, ElasticSearchCommon.PASSWORD)); | ||
|
||
RestClientBuilder builder = RestClient.builder( | ||
new HttpHost(host, port, "http")) | ||
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { | ||
@Override | ||
public HttpAsyncClientBuilder customizeHttpClient( | ||
HttpAsyncClientBuilder httpClientBuilder) { | ||
return httpClientBuilder | ||
.setDefaultCredentialsProvider(credentialsProvider); | ||
} | ||
}); | ||
|
||
RestClient httpClient = builder.build(); | ||
ElasticsearchTransport transport = new RestClientTransport( | ||
httpClient, | ||
new JacksonJsonpMapper() | ||
); | ||
|
||
client = new ElasticsearchClient(transport); | ||
this.index = index; | ||
} | ||
|
||
public boolean indexExists() { | ||
try { | ||
ExistsRequest indexRequest = new ExistsRequest.Builder().index(index).build(); | ||
|
||
return client.indices().exists(indexRequest).value(); | ||
} catch (IOException e) { | ||
/* | ||
It may return if failed to parse the response, on timeout or no response from the ES instance. | ||
Assuming it is more likely to timeout or provide no reply either the during the start up or | ||
on overloaded CI environments, we log the I/O error and try again | ||
*/ | ||
LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e); | ||
} | ||
|
||
return false; | ||
} | ||
|
||
public List<Hit<ObjectNode>> getData() { | ||
try { | ||
SearchResponse<ObjectNode> response = client.search(s -> | ||
s.index(index) | ||
.query(QueryBuilders.matchAll().build()._toQuery()), | ||
ObjectNode.class); | ||
|
||
return response.hits().hits(); | ||
} catch (IOException e) { | ||
/* | ||
It may return if failed to parse the response, on timeout or no response from the ES instance. | ||
Assuming it is more likely to timeout or provide no reply either the during the start up or | ||
on overloaded CI environments, we log the I/O error and try again | ||
*/ | ||
LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e); | ||
} catch (Throwable e) { | ||
LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e); | ||
} | ||
|
||
return null; | ||
} | ||
|
||
private boolean hasData(int expect) { | ||
List<Hit<ObjectNode>> searchHits = getData(); | ||
|
||
if (searchHits == null) { | ||
LOG.debug("There are not search hit to return"); | ||
|
||
return false; | ||
} | ||
|
||
int count = searchHits.size(); | ||
|
||
if (count != expect) { | ||
LOG.debug("Not enough records: {} available, but {} expected", count, expect); | ||
|
||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
public void waitForIndex() { | ||
TestUtils.waitFor(this::indexExists); | ||
} | ||
|
||
public void waitForData(int expect) { | ||
TestUtils.waitFor(this::hasData, expect); | ||
} | ||
} |
45 changes: 45 additions & 0 deletions
45
...c/test/java/org/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchCommon.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.kafkaconnector.elasticsearch.common; | ||
|
||
import org.testcontainers.elasticsearch.ElasticsearchContainer; | ||
|
||
public final class ElasticSearchCommon { | ||
/** | ||
* The default ElasticSearch cluster name for usage during the tests | ||
*/ | ||
public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster"; | ||
|
||
/** | ||
* The default ElasticSearch index for usage during the tests | ||
*/ | ||
public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index"; | ||
|
||
/** | ||
* The default ElasticSearch container username | ||
*/ | ||
public static final String USERNAME = "elastic"; | ||
|
||
/** | ||
* The default ElasticSearch container password | ||
*/ | ||
public static final String PASSWORD = ElasticsearchContainer.ELASTICSEARCH_DEFAULT_PASSWORD; | ||
|
||
private ElasticSearchCommon() { | ||
|
||
} | ||
} |
52 changes: 52 additions & 0 deletions
52
...g/apache/camel/kafkaconnector/elasticsearch/common/ElasticSearchIndexMessageProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.kafkaconnector.elasticsearch.common; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
import org.apache.camel.kafkaconnector.CamelSinkTask; | ||
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; | ||
import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer; | ||
|
||
public class ElasticSearchIndexMessageProducer extends AbstractTestMessageProducer<String> { | ||
|
||
public ElasticSearchIndexMessageProducer(String bootStrapServer, String topicName, int count) { | ||
super(bootStrapServer, topicName, count); | ||
} | ||
|
||
public ElasticSearchIndexMessageProducer(KafkaClient<String, String> kafkaClient, String topicName, int count) { | ||
super(kafkaClient, topicName, count); | ||
} | ||
|
||
@Override | ||
public Map<String, String> messageHeaders(String text, int current) { | ||
return Collections.singletonMap(CamelSinkTask.HEADER_CAMEL_PREFIX + "indexId", String.valueOf(current)); | ||
} | ||
|
||
@Override | ||
public String testMessageContent(int current) { | ||
return "{\n" | ||
+ " \"tags\": [\n" | ||
+ " \"opster\",\n" | ||
+ " \"elasticsearch\"\n" | ||
+ " ],\n" | ||
+ " \"date\": \"01-01-2020\",\n" | ||
+ " \"counter\": \"" + current + "\"\n" | ||
+ "}"; | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
...che/camel/kafkaconnector/elasticsearch/common/ElasticSearchLocalContainerServiceHack.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.kafkaconnector.elasticsearch.common; | ||
|
||
import java.io.IOException; | ||
import java.lang.reflect.Field; | ||
import java.nio.file.Files; | ||
import java.nio.file.OpenOption; | ||
import java.nio.file.Path; | ||
|
||
import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchLocalContainerService; | ||
import org.testcontainers.elasticsearch.ElasticsearchContainer; | ||
|
||
public class ElasticSearchLocalContainerServiceHack extends ElasticSearchLocalContainerService { | ||
public ElasticSearchLocalContainerServiceHack() { | ||
super(); | ||
} | ||
|
||
public ElasticSearchLocalContainerServiceHack(String imageName) { | ||
super(imageName); | ||
} | ||
|
||
public ElasticSearchLocalContainerServiceHack(ElasticsearchContainer container) { | ||
super(container); | ||
} | ||
|
||
@Override | ||
public void registerProperties() { | ||
System.setProperty("elasticsearch.host", this.getElasticSearchHost()); | ||
System.setProperty("elasticsearch.port", String.valueOf(this.getPort())); | ||
this.getContainer().caCertAsBytes().ifPresent(content -> { | ||
try { | ||
Field certPath = getClass().getSuperclass().getDeclaredField("certPath"); | ||
certPath.setAccessible(true); // enables access to private variables | ||
certPath.set(this, Files.createTempFile("http_ca", ".crt")); | ||
Files.write((Path) certPath.get(this), content, new OpenOption[0]); | ||
|
||
Field sslContext = getClass().getSuperclass().getDeclaredField("sslContext"); | ||
sslContext.setAccessible(true); // enables access to private variables | ||
sslContext.set(this, this.getContainer().createSslContextFromCa()); | ||
} catch (IOException | NoSuchFieldException | IllegalAccessException var3) { | ||
throw new RuntimeException(var3); | ||
} | ||
}); | ||
} | ||
} |
Oops, something went wrong.