Skip to content

Commit

Permalink
Merge pull request #71 from joe-td/support-elasticsearch-version-8
Browse files Browse the repository at this point in the history
support elasticsearch version 8
  • Loading branch information
joe-td authored Feb 17, 2023
2 parents 7aea4f9 + 35f41f3 commit 0a233f7
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## 0.6.0 - 2023-02-14
* [maintenance] Support typeless endpoint for ES version 8.x [#71](https://github.com/embulk/embulk-output-elasticsearch/pull/71)

## 0.4.7 - 2018-12-14
* [maintenance] Show warning logs instead of throwing ConfigException for AWS ES [#49](https://github.com/embulk/embulk-output-elasticsearch/pull/49)
* [maintenance] Updated Embulk version v0.8.36 to v0.9.11 [#55](https://github.com/embulk/embulk-output-elasticsearch/pull/55)

## 0.4.6 - 2018-08-01
* [new feature] Add "connect_timeout_millis" option [#53](https://github.com/embulk/embulk-output-elasticsearch/pull/53)
* [new feature] Only build with Java8 [#52](https://github.com/embulk/embulk-output-elasticsearch/pull/52)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ $ ./gradlew bintrayUpload # release embulk-output-elasticsearch to Bintray maven
## Test

Firstly install Docker and Docker compose then `docker-compose up -d`,
so that an MongoDB server will be locally launched then you can run tests with `./gradlew test`.
so that an ES server will be locally launched then you can run tests with `./gradlew test`.

```sh
$ docker-compose up -d
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repositories {
}

group = "org.embulk"
version = "0.5.0-SNAPSHOT"
version = "0.6.0-SNAPSHOT"
description = "Elasticsearch output plugin is an Embulk plugin that loads records to Elasticsearch read by any input plugins."

sourceCompatibility = 1.8
Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@ version: '3.1'
services:
elasticsearch:
container_name: embulk-output-elasticsearch_server
image: elasticsearch:5
#image: elasticsearch:5
#image: elasticsearch:6.8.21
#image: elasticsearch:7.17.8
# For Mac M1
#image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6-arm64
image: elasticsearch:8.6.1
ports:
- 19200:9200
- 19300:9300

# use this environment for v7.x & v8.x
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- ./es-data:/usr/share/elasticsearch/data/
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public class ElasticsearchHttpClient
// public static final int MAX_INDEX_NAME_BYTES = 255;
// @see https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java#L108
private final long maxIndexNameBytes = 255;
private final List<Character> inalidIndexCharaters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ',');
private final List<Character> invalidIndexCharacters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ',');

public static final int ES_SUPPORT_TYPELESS_API_VERSION = 8;
private static int ES_CURRENT_MAJOR_VERSION = 0;

public ElasticsearchHttpClient()
{
Expand All @@ -83,7 +86,10 @@ public void push(JsonNode records, PluginTask task)
// {"k" : "v2"}\n
// '
try {
String path = String.format("/%s/%s/_bulk", task.getIndex(), task.getType());
int esMajorVersion = this.getEsMajorVersion(task);
String path = esMajorVersion >= ES_SUPPORT_TYPELESS_API_VERSION
? String.format("/%s/_bulk", task.getIndex())
: String.format("/%s/%s/_bulk", task.getIndex(), task.getType());
int recordSize = records.size();
String idColumn = task.getId().orElse(null);
if (recordSize > 0) {
Expand Down Expand Up @@ -182,11 +188,27 @@ public String getEsVersion(PluginTask task)
return response.get("version").get("number").asText();
}

public int getEsMajorVersion(PluginTask task)
{
try {
if (ES_CURRENT_MAJOR_VERSION > 0) {
return ES_CURRENT_MAJOR_VERSION;
}

final String esVersion = getEsVersion(task);
ES_CURRENT_MAJOR_VERSION = Integer.parseInt(esVersion.substring(0, 1));
return ES_CURRENT_MAJOR_VERSION;
}
catch (Exception ex) {
throw new RuntimeException("Failed to fetch ES version");
}
}

public void validateIndexOrAliasName(String index, String type)
{
for (int i = 0; i < index.length(); i++) {
if (inalidIndexCharaters.contains(index.charAt(i))) {
throw new ConfigException(String.format("%s '%s' must not contain the invalid characters " + inalidIndexCharaters.toString(), type, index));
if (invalidIndexCharacters.contains(index.charAt(i))) {
throw new ConfigException(String.format("%s '%s' must not contain the invalid characters " + invalidIndexCharacters.toString(), type, index));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Exec;
import org.embulk.spi.Schema;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.config.TaskReport;
import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.Arrays;
import java.util.List;

import static org.junit.Assume.assumeNotNull;

public class ElasticsearchTestUtils
{
public static String ES_HOST;
Expand All @@ -46,6 +44,8 @@ public class ElasticsearchTestUtils
public static String ES_INDEX2;
public static String ES_ALIAS;

public static int ES_MIN_API_VERSION = 7;

public void initializeConstant()
{
ES_HOST = "localhost";
Expand All @@ -72,11 +72,6 @@ public void prepareBeforeTest(PluginTask task) throws Exception
Method deleteIndex = ElasticsearchHttpClient.class.getDeclaredMethod("deleteIndex", String.class, PluginTask.class);
deleteIndex.setAccessible(true);

// Delete alias
if (client.isAliasExisting(ES_ALIAS, task)) {
deleteIndex.invoke(client, ES_ALIAS, task);
}

// Delete index
if (client.isIndexExisting(ES_INDEX, task)) {
deleteIndex.invoke(client, ES_INDEX, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import org.embulk.spi.time.Timestamp;
import org.embulk.standards.CsvParserPlugin;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -59,7 +57,6 @@

public class TestElasticsearchOutputPlugin
{
private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ElasticsearchOutputPlugin.CONFIG_MAPPER_FACTORY;
private static final ConfigMapper CONFIG_MAPPER = ElasticsearchOutputPlugin.CONFIG_MAPPER;

@Rule
Expand Down Expand Up @@ -184,15 +181,23 @@ public List<TaskReport> run(TaskSource taskSource)

output.finish();
output.commit();
Thread.sleep(1500); // Need to wait until index done
Thread.sleep(3000); // Need to wait until index done

ElasticsearchHttpClient client = new ElasticsearchHttpClient();
Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class);
sendRequest.setAccessible(true);
String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
int esMajorVersion = client.getEsMajorVersion(task);
String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION
? String.format("/%s/_search", ES_INDEX)
: String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
String sort = "{\"sort\" : \"id\"}";
JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort);
assertThat(response.get("hits").get("total").asInt(), is(1));

int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION
? response.get("hits").get("total").get("value").asInt()
: response.get("hits").get("total").asInt();

assertThat(totalHits, is(1));
if (response.size() > 0) {
JsonNode record = response.get("hits").get("hits").get(0).get("_source");
assertThat(record.get("id").asInt(), is(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -148,10 +147,19 @@ public List<TaskReport> run(TaskSource taskSource)
ElasticsearchHttpClient client = new ElasticsearchHttpClient();
Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class);
sendRequest.setAccessible(true);
String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
int esMajorVersion = client.getEsMajorVersion(task);
String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION
? String.format("/%s/_search", ES_INDEX)
: String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
String sort = "{\"sort\" : \"id\"}";
JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort);
assertThat(response.get("hits").get("total").asInt(), is(1));

int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION
? response.get("hits").get("total").get("value").asInt()
: response.get("hits").get("total").asInt();

assertThat(totalHits, is(1));

if (response.size() > 0) {
JsonNode record = response.get("hits").get("hits").get(0).get("_source");
assertThat(record.get("id").asInt(), is(1));
Expand Down Expand Up @@ -192,10 +200,20 @@ public List<TaskReport> run(TaskSource taskSource)
ElasticsearchHttpClient client = new ElasticsearchHttpClient();
Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class);
sendRequest.setAccessible(true);
String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
int esMajorVersion = client.getEsMajorVersion(task);

String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION
? String.format("/%s/_search", ES_INDEX)
: String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
String sort = "{\"sort\" : \"id\"}";

JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort);
assertThat(response.get("hits").get("total").asInt(), is(1));

int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION
? response.get("hits").get("total").get("value").asInt()
: response.get("hits").get("total").asInt();

assertThat(totalHits, is(1));
if (response.size() > 0) {
JsonNode record = response.get("hits").get("hits").get(0).get("_source");
assertThat(record.get("id").asInt(), is(2));
Expand Down

0 comments on commit 0a233f7

Please sign in to comment.