From 10d23386935df5f0d99ac94aa3b4836b78753062 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Mon, 13 Feb 2023 08:16:50 +0700 Subject: [PATCH 1/7] support elasticsearch version 8 --- README.md | 2 +- docker-compose.yml | 12 ++++- .../ElasticsearchHttpClient.java | 52 +++++++++++++++++-- .../elasticsearch/ElasticsearchTestUtils.java | 12 ++++- .../TestElasticsearchOutputPlugin.java | 14 +++-- .../TestElasticsearchOutputPluginJSON.java | 27 ++++++++-- 6 files changed, 105 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 2ae0c55..12936d7 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ $ ./gradlew bintrayUpload # release embulk-output-elasticsearch to Bintray maven ## Test Firstly install Docker and Docker compose then `docker-compose up -d`, -so that an MongoDB server will be locally launched then you can run tests with `./gradlew test`. +so that an ES server will be locally launched then you can run tests with `./gradlew test`. ```sh $ docker-compose up -d diff --git a/docker-compose.yml b/docker-compose.yml index 1d094e6..a0af747 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,9 +2,19 @@ version: '3.1' services: elasticsearch: container_name: embulk-output-elasticsearch_server - image: elasticsearch:5 + #image: elasticsearch:5 + #image: elasticsearch:6.8.21 + #image: elasticsearch:7.17.8 + # For Mac M1 + #image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6-arm64 + image: elasticsearch:8.6.1 ports: - 19200:9200 - 19300:9300 + + # use this environment for v7.x & v8.x + #environment: + # - discovery.type=single-node + # - xpack.security.enabled=false volumes: - ./es-data:/usr/share/elasticsearch/data/ diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java index eb786ae..7e0109e 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java @@ -65,7 +65,10 @@ public class ElasticsearchHttpClient // public static final int MAX_INDEX_NAME_BYTES = 255; // @see https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java#L108 private final long maxIndexNameBytes = 255; - private final List inalidIndexCharaters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ','); + private final List invalidIndexCharacters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ','); + + public static final int ES_SUPPORT_TYPELESS_API_VERSION = 7; + public static final int ES_SUPPORT_MIN_VERSION = 5; public ElasticsearchHttpClient() { @@ -83,7 +86,10 @@ public void push(JsonNode records, PluginTask task) // {"k" : "v2"}\n // ' try { - String path = String.format("/%s/%s/_bulk", task.getIndex(), task.getType()); + int esMajorVersion = this.getEsMajorVersion(task); + String path = esMajorVersion >= ES_SUPPORT_TYPELESS_API_VERSION + ? String.format("/%s/_bulk", task.getIndex()) + : String.format("/%s/%s/_bulk", task.getIndex(), task.getType()); int recordSize = records.size(); String idColumn = task.getId().orElse(null); if (recordSize > 0) { @@ -182,11 +188,21 @@ public String getEsVersion(PluginTask task) return response.get("version").get("number").asText(); } + public int getEsMajorVersion(PluginTask task) + { + try { + String esVersion = getEsVersion(task); + return Integer.parseInt(esVersion.substring(0, 1)); + } catch (Exception ex) { + return ES_SUPPORT_MIN_VERSION; + } + } + public void validateIndexOrAliasName(String index, String type) { for (int i = 0; i < index.length(); i++) { - if (inalidIndexCharaters.contains(index.charAt(i))) { - throw new ConfigException(String.format("%s '%s' must not contain the invalid characters " + inalidIndexCharaters.toString(), type, index)); + if (invalidIndexCharacters.contains(index.charAt(i))) { + throw new ConfigException(String.format("%s '%s' must not contain the invalid characters " + invalidIndexCharacters.toString(), type, index)); } } @@ -289,6 +305,34 @@ private void deleteIndex(String indexName, PluginTask task) } } + private void deleteAlias(String indexName, String aliasName, PluginTask task) + { + try { + if (isIndexExisting(indexName, task)) { + if (isAliasExisting(aliasName, task)) { + Map alias = new HashMap<>(); + alias.put("index", indexName); + alias.put("alias", aliasName); + + Map remove = new HashMap<>(); + remove.put("remove", alias); + + List> actions = new ArrayList<>(); + actions.add(remove); + Map rootTree = new HashMap<>(); + rootTree.put("actions", actions); + + String content = jsonMapper.writeValueAsString(rootTree); + sendRequest("/_aliases", HttpMethod.POST, task, content); + log.info("Remove alias [{}] to index[{}]", aliasName, indexName); + } + } + } + catch (JsonProcessingException ex) { + throw new ConfigException(String.format("Failed to remove alias[%s] to index[%s]", aliasName, indexName)); + } + } + private void waitSnapshot(PluginTask task) { int maxSnapshotWaitingMills = task.getMaxSnapshotWaitingSecs() * 1000; diff --git a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java index f4d4700..c665138 100644 --- a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java +++ b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java @@ -72,9 +72,19 @@ public void prepareBeforeTest(PluginTask task) throws Exception Method deleteIndex = ElasticsearchHttpClient.class.getDeclaredMethod("deleteIndex", String.class, PluginTask.class); deleteIndex.setAccessible(true); + Method deleteAlias = ElasticsearchHttpClient.class.getDeclaredMethod("deleteAlias", String.class, String.class, PluginTask.class); + deleteAlias.setAccessible(true); + + int esMajorVersion = client.getEsMajorVersion(task); + // Delete alias if (client.isAliasExisting(ES_ALIAS, task)) { - deleteIndex.invoke(client, ES_ALIAS, task); + if (esMajorVersion <= ElasticsearchHttpClient.ES_SUPPORT_MIN_VERSION) { + deleteIndex.invoke(client, ES_ALIAS, task); + } + else { + deleteAlias.invoke(client, ES_INDEX, ES_ALIAS, task); + } } // Delete index diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java index ea78803..9959b4f 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java @@ -184,15 +184,23 @@ public List run(TaskSource taskSource) output.finish(); output.commit(); - Thread.sleep(1500); // Need to wait until index done + Thread.sleep(2500); // Need to wait until index done ElasticsearchHttpClient client = new ElasticsearchHttpClient(); Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class); sendRequest.setAccessible(true); - String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); + int esMajorVersion = client.getEsMajorVersion(task); + String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + ? String.format("/%s/_search", ES_INDEX) + : String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - assertThat(response.get("hits").get("total").asInt(), is(1)); + + int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + ? response.get("hits").get("total").get("value").asInt() + : response.get("hits").get("total").asInt(); + + assertThat(totalHits, is(1)); if (response.size() > 0) { JsonNode record = response.get("hits").get("hits").get(0).get("_source"); assertThat(record.get("id").asInt(), is(1)); diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java index 451a170..e146023 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java @@ -148,10 +148,19 @@ public List run(TaskSource taskSource) ElasticsearchHttpClient client = new ElasticsearchHttpClient(); Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class); sendRequest.setAccessible(true); - String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); + int esMajorVersion = client.getEsMajorVersion(task); + String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + ? String.format("/%s/_search", ES_INDEX) + : String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - assertThat(response.get("hits").get("total").asInt(), is(1)); + + int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + ? response.get("hits").get("total").get("value").asInt() + : response.get("hits").get("total").asInt(); + + assertThat(totalHits, is(1)); + if (response.size() > 0) { JsonNode record = response.get("hits").get("hits").get(0).get("_source"); assertThat(record.get("id").asInt(), is(1)); @@ -192,10 +201,20 @@ public List run(TaskSource taskSource) ElasticsearchHttpClient client = new ElasticsearchHttpClient(); Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class); sendRequest.setAccessible(true); - String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); + int esMajorVersion = client.getEsMajorVersion(task); + + String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_MIN_VERSION + ? String.format("/%s/_search", ES_INDEX) + : String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); String sort = "{\"sort\" : \"id\"}"; + JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - assertThat(response.get("hits").get("total").asInt(), is(1)); + + int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + ? response.get("hits").get("total").get("value").asInt() + : response.get("hits").get("total").asInt(); + + assertThat(totalHits, is(1)); if (response.size() > 0) { JsonNode record = response.get("hits").get("hits").get(0).get("_source"); assertThat(record.get("id").asInt(), is(2)); From 53c6f40633e6f619e60e821f78881caae4e20720 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Tue, 14 Feb 2023 15:50:20 +0700 Subject: [PATCH 2/7] bump version to 0.6.0 and update CHANGELOG.md --- CHANGELOG.md | 4 ++++ build.gradle | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a95f97c..a25f4e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ +## 0.6.0 - 2023-02-14 +* [maintenance] Support typeless endpoint for ES version 7.x & 8.x [#71](https://github.com/embulk/embulk-output-elasticsearch/pull/71) + ## 0.4.7 - 2018-12-14 * [maintenance] Show warning logs instead of throwing ConfigException for AWS ES [#49](https://github.com/embulk/embulk-output-elasticsearch/pull/49) * [maintenance] Updated Embulk version v0.8.36 to v0.9.11 [#55](https://github.com/embulk/embulk-output-elasticsearch/pull/55) + ## 0.4.6 - 2018-08-01 * [new feature] Add "connect_timeout_millis" option [#53](https://github.com/embulk/embulk-output-elasticsearch/pull/53) * [new feature] Only build with Java8 [#52](https://github.com/embulk/embulk-output-elasticsearch/pull/52) diff --git a/build.gradle b/build.gradle index bf910d3..faf1a79 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ repositories { } group = "org.embulk" -version = "0.5.0-SNAPSHOT" +version = "0.6.0-SNAPSHOT" description = "Elasticsearch output plugin is an Embulk plugin that loads records to Elasticsearch read by any input plugins." sourceCompatibility = 1.8 From 9f2dffe80eff1b2a723cf324e44a18f31cc73199 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Wed, 15 Feb 2023 11:12:40 +0700 Subject: [PATCH 3/7] resolve reviews --- CHANGELOG.md | 2 +- docker-compose.yml | 6 +++--- .../elasticsearch/ElasticsearchHttpClient.java | 16 +++++++++++++--- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a25f4e3..421b870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## 0.6.0 - 2023-02-14 -* [maintenance] Support typeless endpoint for ES version 7.x & 8.x [#71](https://github.com/embulk/embulk-output-elasticsearch/pull/71) +* [maintenance] Support typeless endpoint for ES version 8.x [#71](https://github.com/embulk/embulk-output-elasticsearch/pull/71) ## 0.4.7 - 2018-12-14 * [maintenance] Show warning logs instead of throwing ConfigException for AWS ES [#49](https://github.com/embulk/embulk-output-elasticsearch/pull/49) diff --git a/docker-compose.yml b/docker-compose.yml index a0af747..c499a47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,8 +13,8 @@ services: - 19300:9300 # use this environment for v7.x & v8.x - #environment: - # - discovery.type=single-node - # - xpack.security.enabled=false + environment: + - discovery.type=single-node + - xpack.security.enabled=false volumes: - ./es-data:/usr/share/elasticsearch/data/ diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java index 7e0109e..6d29b83 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -67,9 +68,11 @@ public class ElasticsearchHttpClient private final long maxIndexNameBytes = 255; private final List invalidIndexCharacters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ','); - public static final int ES_SUPPORT_TYPELESS_API_VERSION = 7; + public static final int ES_SUPPORT_TYPELESS_API_VERSION = 8; public static final int ES_SUPPORT_MIN_VERSION = 5; + private static Integer ES_CURRENT_MAJOR_VERSION; + public ElasticsearchHttpClient() { this.log = LoggerFactory.getLogger(getClass()); @@ -191,8 +194,15 @@ public String getEsVersion(PluginTask task) public int getEsMajorVersion(PluginTask task) { try { - String esVersion = getEsVersion(task); - return Integer.parseInt(esVersion.substring(0, 1)); + if (Objects.nonNull(ES_CURRENT_MAJOR_VERSION)) { + return ES_CURRENT_MAJOR_VERSION.intValue(); + } + else { + final String esVersion = getEsVersion(task); + final int esMajorVersion = Integer.parseInt(esVersion.substring(0, 1)); + ES_CURRENT_MAJOR_VERSION = Integer.valueOf(esMajorVersion); + return esMajorVersion; + } } catch (Exception ex) { return ES_SUPPORT_MIN_VERSION; } From 619b8f8af9f103d645e82d9f8ab8168eb3ff38d9 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Thu, 16 Feb 2023 12:06:42 +0700 Subject: [PATCH 4/7] clean alias when remove indices --- .../ElasticsearchHttpClient.java | 25 ++++++++----------- .../ElasticsearchOutputPluginDelegate.java | 1 - .../ElasticsearchRecordBuffer.java | 1 - .../elasticsearch/ElasticsearchTestUtils.java | 13 +++------- .../TestElasticsearchOutputPlugin.java | 7 ++---- .../TestElasticsearchOutputPluginJSON.java | 7 +++--- 6 files changed, 19 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java index 6d29b83..4512f49 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java @@ -48,7 +48,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -69,9 +68,7 @@ public class ElasticsearchHttpClient private final List invalidIndexCharacters = Arrays.asList('\\', '/', '*', '?', '"', '<', '>', '|', '#', ' ', ','); public static final int ES_SUPPORT_TYPELESS_API_VERSION = 8; - public static final int ES_SUPPORT_MIN_VERSION = 5; - - private static Integer ES_CURRENT_MAJOR_VERSION; + private static int ES_CURRENT_MAJOR_VERSION = 0; public ElasticsearchHttpClient() { @@ -180,6 +177,7 @@ public void reassignAlias(String aliasName, String newIndexName, PluginTask task assignAlias(newIndexName, aliasName, task); for (String index : oldIndices) { deleteIndex(index, task); + deleteAlias(index, aliasName, task); } } } @@ -194,17 +192,16 @@ public String getEsVersion(PluginTask task) public int getEsMajorVersion(PluginTask task) { try { - if (Objects.nonNull(ES_CURRENT_MAJOR_VERSION)) { - return ES_CURRENT_MAJOR_VERSION.intValue(); - } - else { - final String esVersion = getEsVersion(task); - final int esMajorVersion = Integer.parseInt(esVersion.substring(0, 1)); - ES_CURRENT_MAJOR_VERSION = Integer.valueOf(esMajorVersion); - return esMajorVersion; + if (ES_CURRENT_MAJOR_VERSION > 0) { + return ES_CURRENT_MAJOR_VERSION; } - } catch (Exception ex) { - return ES_SUPPORT_MIN_VERSION; + + final String esVersion = getEsVersion(task); + ES_CURRENT_MAJOR_VERSION = Integer.parseInt(esVersion.substring(0, 1)); + return ES_CURRENT_MAJOR_VERSION; + } + catch (Exception ex) { + throw new RuntimeException("Failed to fetch ES version"); } } diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPluginDelegate.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPluginDelegate.java index 874da74..cfb0679 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPluginDelegate.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPluginDelegate.java @@ -27,7 +27,6 @@ import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; -import org.embulk.spi.Exec; import org.embulk.spi.Schema; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java index 28f4125..f67cbf6 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java @@ -26,7 +26,6 @@ import org.embulk.base.restclient.record.ServiceRecord; import org.embulk.config.TaskReport; import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask; -import org.embulk.spi.Exec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java index c665138..3f5913a 100644 --- a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java +++ b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java @@ -28,8 +28,6 @@ import java.util.Arrays; import java.util.List; -import static org.junit.Assume.assumeNotNull; - public class ElasticsearchTestUtils { public static String ES_HOST; @@ -46,6 +44,8 @@ public class ElasticsearchTestUtils public static String ES_INDEX2; public static String ES_ALIAS; + public static int ES_SUPPORT_TYPELESS_API_VERSION = 7; + public void initializeConstant() { ES_HOST = "localhost"; @@ -75,16 +75,9 @@ public void prepareBeforeTest(PluginTask task) throws Exception Method deleteAlias = ElasticsearchHttpClient.class.getDeclaredMethod("deleteAlias", String.class, String.class, PluginTask.class); deleteAlias.setAccessible(true); - int esMajorVersion = client.getEsMajorVersion(task); - // Delete alias if (client.isAliasExisting(ES_ALIAS, task)) { - if (esMajorVersion <= ElasticsearchHttpClient.ES_SUPPORT_MIN_VERSION) { - deleteIndex.invoke(client, ES_ALIAS, task); - } - else { - deleteAlias.invoke(client, ES_INDEX, ES_ALIAS, task); - } + deleteAlias.invoke(client, ES_INDEX, ES_ALIAS, task); } // Delete index diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java index 9959b4f..4557856 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java @@ -36,9 +36,7 @@ import org.embulk.spi.time.Timestamp; import org.embulk.standards.CsvParserPlugin; import org.embulk.util.config.ConfigMapper; -import org.embulk.util.config.ConfigMapperFactory; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -59,7 +57,6 @@ public class TestElasticsearchOutputPlugin { - private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ElasticsearchOutputPlugin.CONFIG_MAPPER_FACTORY; private static final ConfigMapper CONFIG_MAPPER = ElasticsearchOutputPlugin.CONFIG_MAPPER; @Rule @@ -184,7 +181,7 @@ public List run(TaskSource taskSource) output.finish(); output.commit(); - Thread.sleep(2500); // Need to wait until index done + Thread.sleep(3000); // Need to wait until index done ElasticsearchHttpClient client = new ElasticsearchHttpClient(); Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, String.class); @@ -196,7 +193,7 @@ public List run(TaskSource taskSource) String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java index e146023..4ee32f5 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java @@ -36,7 +36,6 @@ import org.embulk.util.config.ConfigMapper; import org.embulk.util.config.ConfigMapperFactory; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -155,7 +154,7 @@ public List run(TaskSource taskSource) String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); @@ -203,14 +202,14 @@ public List run(TaskSource taskSource) sendRequest.setAccessible(true); int esMajorVersion = client.getEsMajorVersion(task); - String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_MIN_VERSION + String path = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION ? String.format("/%s/_search", ES_INDEX) : String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE); String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchHttpClient.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); From 3e12d0b768c266c552bacf1b399f558b79db1bcb Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Fri, 17 Feb 2023 09:54:01 +0700 Subject: [PATCH 5/7] rename variable to avoid confusion --- .../embulk/output/elasticsearch/ElasticsearchTestUtils.java | 2 +- .../output/elasticsearch/TestElasticsearchOutputPlugin.java | 2 +- .../elasticsearch/TestElasticsearchOutputPluginJSON.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java index 3f5913a..1b8252b 100644 --- a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java +++ b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java @@ -44,7 +44,7 @@ public class ElasticsearchTestUtils public static String ES_INDEX2; public static String ES_ALIAS; - public static int ES_SUPPORT_TYPELESS_API_VERSION = 7; + public static int ES_MIN_API_VERSION = 7; public void initializeConstant() { diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java index 4557856..4a54715 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java @@ -193,7 +193,7 @@ public List run(TaskSource taskSource) String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); diff --git a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java index 4ee32f5..63292e9 100644 --- a/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java +++ b/src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPluginJSON.java @@ -154,7 +154,7 @@ public List run(TaskSource taskSource) String sort = "{\"sort\" : \"id\"}"; JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); @@ -209,7 +209,7 @@ public List run(TaskSource taskSource) JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, sort); - int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_SUPPORT_TYPELESS_API_VERSION + int totalHits = esMajorVersion >= ElasticsearchTestUtils.ES_MIN_API_VERSION ? response.get("hits").get("total").get("value").asInt() : response.get("hits").get("total").asInt(); From 803767f5f9810f03aede1b7105bd11181b4c0347 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Fri, 17 Feb 2023 10:41:25 +0700 Subject: [PATCH 6/7] re-order of remove alias --- .../embulk/output/elasticsearch/ElasticsearchHttpClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java index 4512f49..a87a3a5 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java @@ -176,8 +176,8 @@ public void reassignAlias(String aliasName, String newIndexName, PluginTask task List oldIndices = getIndexByAlias(aliasName, task); assignAlias(newIndexName, aliasName, task); for (String index : oldIndices) { - deleteIndex(index, task); deleteAlias(index, aliasName, task); + deleteIndex(index, task); } } } From 35f41f32607c8857aa1757f08b87c57de5a82d36 Mon Sep 17 00:00:00 2001 From: Joe Vu Date: Fri, 17 Feb 2023 11:24:28 +0700 Subject: [PATCH 7/7] remove delete alias method --- .../ElasticsearchHttpClient.java | 29 ------------------- .../elasticsearch/ElasticsearchTestUtils.java | 8 ----- 2 files changed, 37 deletions(-) diff --git a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java index a87a3a5..91cfcfd 100644 --- a/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java +++ b/src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java @@ -176,7 +176,6 @@ public void reassignAlias(String aliasName, String newIndexName, PluginTask task List oldIndices = getIndexByAlias(aliasName, task); assignAlias(newIndexName, aliasName, task); for (String index : oldIndices) { - deleteAlias(index, aliasName, task); deleteIndex(index, task); } } @@ -312,34 +311,6 @@ private void deleteIndex(String indexName, PluginTask task) } } - private void deleteAlias(String indexName, String aliasName, PluginTask task) - { - try { - if (isIndexExisting(indexName, task)) { - if (isAliasExisting(aliasName, task)) { - Map alias = new HashMap<>(); - alias.put("index", indexName); - alias.put("alias", aliasName); - - Map remove = new HashMap<>(); - remove.put("remove", alias); - - List> actions = new ArrayList<>(); - actions.add(remove); - Map rootTree = new HashMap<>(); - rootTree.put("actions", actions); - - String content = jsonMapper.writeValueAsString(rootTree); - sendRequest("/_aliases", HttpMethod.POST, task, content); - log.info("Remove alias [{}] to index[{}]", aliasName, indexName); - } - } - } - catch (JsonProcessingException ex) { - throw new ConfigException(String.format("Failed to remove alias[%s] to index[%s]", aliasName, indexName)); - } - } - private void waitSnapshot(PluginTask task) { int maxSnapshotWaitingMills = task.getMaxSnapshotWaitingSecs() * 1000; diff --git a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java index 1b8252b..877cd1a 100644 --- a/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java +++ b/src/test/java/org/embulk/output/elasticsearch/ElasticsearchTestUtils.java @@ -72,14 +72,6 @@ public void prepareBeforeTest(PluginTask task) throws Exception Method deleteIndex = ElasticsearchHttpClient.class.getDeclaredMethod("deleteIndex", String.class, PluginTask.class); deleteIndex.setAccessible(true); - Method deleteAlias = ElasticsearchHttpClient.class.getDeclaredMethod("deleteAlias", String.class, String.class, PluginTask.class); - deleteAlias.setAccessible(true); - - // Delete alias - if (client.isAliasExisting(ES_ALIAS, task)) { - deleteAlias.invoke(client, ES_INDEX, ES_ALIAS, task); - } - // Delete index if (client.isIndexExisting(ES_INDEX, task)) { deleteIndex.invoke(client, ES_INDEX, task);