Skip to content

Commit

Permalink
Merge pull request #553 from Hurence/feature/LOGISLAND-532
Browse files Browse the repository at this point in the history
Feature/logisland 532
  • Loading branch information
oalam authored Apr 9, 2020
2 parents a54d7e5 + aaae6be commit 5b9c504
Show file tree
Hide file tree
Showing 46 changed files with 7,545 additions and 445 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ script:
# Build
# use travis_wait so it does not time_out after 10 minutes without output (unfortunately that seems to not work)
# use -q so there's not too much output for travis (4Mb max)
- travis_wait mvn clean install -Pintegration-tests -q
- travis_wait 30 mvn clean install -Pintegration-tests -q
# build assembly (there is currently missing jars in assembly when using mvn clean install...)
- mvn clean package -DskipTests -q
# Integrations tests
Expand Down
2 changes: 2 additions & 0 deletions logisland-assembly/src/assembly/shared-dependencies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
<!-- Now, select which projects to include in this module-set. -->
<includes>
<!-- ENGINES -->
<!-- <include>com.hurence.logisland:logisland-engine-spark_2_4_kafka_2_4</include>-->
<include>com.hurence.logisland:logisland-engine-spark_2_1</include>
<include>com.hurence.logisland:logisland-engine-spark_2_3</include>
<include>com.hurence.logisland:logisland-engine-spark_2_4</include>
<include>com.hurence.logisland:logisland-engine-spark_1_6</include>
<include>com.hurence.logisland:logisland-engine-vanilla</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.3</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.11.0</version>
<version>2.13.1</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.hurence.logisland.service.elasticsearch;


import com.hurence.logisland.annotation.documentation.CapabilityDescription;
import com.hurence.logisland.annotation.documentation.Tags;
import com.hurence.logisland.component.AllowableValue;
Expand All @@ -31,7 +30,6 @@
import java.util.Map;
import java.util.Optional;


@Tags({"elasticsearch", "client"})
@CapabilityDescription("A controller service for accessing an elasticsearch client.")
public interface ElasticsearchClientService extends DatastoreClientService {
Expand Down Expand Up @@ -154,6 +152,16 @@ public ValidationResult validate(final String subject, final String input) {
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();

PropertyDescriptor ENABLE_SSL = new PropertyDescriptor.Builder()
.name("enable.ssl")
.description("Whether to enable (true) TLS/SSL connections or not (false). This can for instance be used" +
" with opendistro. Defaults to false. Note that the current implementation does try to validate" +
" the server certificate.")
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();

PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("username")
.description("Username to access the Elasticsearch cluster")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ESRule implements TestRule {
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchContainer container;

/**
* Return a closure which starts an embedded ES docker container, executes the unit-test, then shuts down the
Expand All @@ -46,7 +47,7 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.6.2");
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.6.2");
container.start();
client = new RestHighLevelClient(RestClient.builder(HttpHost.create(container.getHttpHostAddress())));

Expand All @@ -60,6 +61,10 @@ public void evaluate() throws Throwable {
};
}

public String getHostPortString() {
return container.getHttpHostAddress();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@

import java.io.IOException;
import java.util.*;
import java.util.function.BiConsumer;

import static com.hurence.logisland.service.elasticsearch.ElasticsearchClientService.HOSTS;

public class Elasticsearch_6_6_2_ClientServiceIT {

Expand All @@ -74,84 +75,13 @@ public void clean() throws IOException {
}
}

private class MockElasticsearchClientService extends Elasticsearch_6_6_2_ClientService {

@Override
protected void createElasticsearchClient(ControllerServiceInitializationContext context) throws ProcessException {
if (esClient != null) {
return;
}
esClient = esRule.getClient();
}

@Override
protected void createBulkProcessor(ControllerServiceInitializationContext context) {

if (bulkProcessor != null) {
return;
}

// create the bulk processor

BulkProcessor.Listener listener =
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
getLogger().debug("Going to execute bulk [id:{}] composed of {} actions", new Object[]{l, bulkRequest.numberOfActions()});
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
getLogger().debug("Executed bulk [id:{}] composed of {} actions", new Object[]{l, bulkRequest.numberOfActions()});
if (bulkResponse.hasFailures()) {
getLogger().warn("There was failures while executing bulk [id:{}]," +
" done bulk request in {} ms with failure = {}",
new Object[]{l, bulkResponse.getTook().getMillis(), bulkResponse.buildFailureMessage()});
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
errors.put(item.getId(), item.getFailureMessage());
}
}
}
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
getLogger().error("something went wrong while bulk loading events to es : {}", new Object[]{throwable.getMessage()});
}

};

BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(1))
.setConcurrentRequests(2)
//.setBackoffPolicy(getBackOffPolicy(context))
.build();

}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {

List<PropertyDescriptor> props = new ArrayList<>();

return Collections.unmodifiableList(props);
}

}

private ElasticsearchClientService configureElasticsearchClientService(final TestRunner runner) throws InitializationException {
final MockElasticsearchClientService elasticsearchClientService = new MockElasticsearchClientService();
final Elasticsearch_6_6_2_ClientService elasticsearchClientService = new Elasticsearch_6_6_2_ClientService();

runner.addControllerService("elasticsearchClient", elasticsearchClientService);

runner.enableControllerService(elasticsearchClientService);
runner.setProperty(TestProcessor.ELASTICSEARCH_CLIENT_SERVICE, "elasticsearchClient");
runner.assertValid(elasticsearchClientService);
runner.setProperty(elasticsearchClientService, HOSTS, esRule.getHostPortString());
runner.enableControllerService(elasticsearchClientService);

// TODO : is this necessary ?
final ElasticsearchClientService service = PluginProxy.unwrap(runner.getProcessContext().getPropertyValue(TestProcessor.ELASTICSEARCH_CLIENT_SERVICE).asControllerService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<properties>
<!-- Versions -->
<elasticsearch.version>7.1.1</elasticsearch.version>
<testcontainers.version>1.12.5</testcontainers.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -70,14 +71,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.10.7</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.10.7</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Copyright (C) 2020 Hurence ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hurence.logisland.service.elasticsearch;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;

/**
* A JUnit rule which starts an embedded opendsitro elasticsearch docker container to test security features
*/
public class ESOpenDistroRule implements TestRule {

/**
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchOpenDistroContainer container;
private String opendistroUsername;
private String opendistroPassword;

private static Logger logger = LoggerFactory.getLogger(ESOpenDistroRule.class);

public ESOpenDistroRule(String opendistroUsername, String opendistroPassword) {
this.opendistroUsername = opendistroUsername;
this.opendistroPassword = opendistroPassword;
}

/**
* Return a closure which starts an embedded ES OpenDistro docker container, executes the unit-test, then shuts down the
* ES instance.
*/
@Override
public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
container = new ElasticsearchOpenDistroContainer("amazon/opendistro-for-elasticsearch:1.4.0",
opendistroUsername, opendistroPassword);
container.start();

// TODO: if testcontainers support no SSL server validation one can use the wait strategy
// in ElasticsearchOpenDistroContainer instead. See inside ElasticsearchOpenDistroContainer.
long wait = 10000L;
logger.info("Waiting for ES open distro container to start for " + wait/1000 + " seconds");
Thread.sleep(wait);

/**
* Inspired from https://github.com/opendistro-for-elasticsearch/community/issues/64
*/

RestClientBuilder builder = RestClient.builder(
new HttpHost(container.getHostAddress(), container.getPort(), "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

// Set user/password basic auth credentials
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(opendistroUsername, opendistroPassword));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

// Set SSL trust manager and context
// Create and use a trust manager accepting all server certificates
TrustManager[] acceptAllTrustManager = new TrustManager[] { new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}

public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
} };

SSLContext sslContext = null;
try {
sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, acceptAllTrustManager, new java.security.SecureRandom());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
httpClientBuilder.setSSLContext(sslContext);

return httpClientBuilder;
}
});
client = new RestHighLevelClient(builder);

try {
base.evaluate(); // execute the unit test
} finally {
client.close();
container.stop();
}
}
};
}

public String getHostPortString() {
return container.getHostPortString();
}

public String getHostAddress() {
return container.getHostAddress();
}

public int getPort() {
return container.getPort();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
public RestHighLevelClient getClient() {
return client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ESRule implements TestRule {
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchContainer container;

/**
* Return a closure which starts an embedded ES docker container, executes the unit-test, then shuts down the
Expand All @@ -46,7 +47,7 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.1");
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.1");
container.start();
client = new RestHighLevelClient(RestClient.builder(HttpHost.create(container.getHttpHostAddress())));

Expand All @@ -60,6 +61,10 @@ public void evaluate() throws Throwable {
};
}

public String getHostPortString() {
return container.getHttpHostAddress();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
Expand Down
Loading

0 comments on commit 5b9c504

Please sign in to comment.