diff --git a/spark/build.gradle b/spark/build.gradle index 7de2a82162..c06b5b6ecf 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -92,7 +92,6 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.asyncquery.exceptions.*', 'org.opensearch.sql.spark.dispatcher.model.*', 'org.opensearch.sql.spark.flint.FlintIndexType', - 'org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index 681e49c10a..cfbe44e1ff 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -23,7 +23,7 @@ public FlintIndexMetadata getJobIdFromFlintIndexMetadata(IndexDetails indexDetai Map mappingSourceMap = mappingMetadata.getSourceAsMap(); return FlintIndexMetadata.fromMetatdata((Map) mappingSourceMap.get("_meta")); } catch (NullPointerException npe) { - throw new IllegalArgumentException("Index doesn't exist"); + throw new IllegalArgumentException("Provided Index doesn't exist"); } } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java index 432370a62f..8922f638e0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java @@ -9,7 +9,7 @@ public enum FlintIndexType { SKIPPING("skipping_index"), COVERING("index"), - MATERIALIZED("materialized_view"); + MATERIALIZED_VIEW("materialized_view"); private final String suffix; diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 350bd0afd3..957a218866 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -12,6 +12,7 @@ import java.util.Map; import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -31,26 +32,86 @@ public class FlintIndexMetadataReaderImplTest { @Mock(answer = RETURNS_DEEP_STUBS) private Client client; - // TODO FIX this @SneakyThrows - // @Test - void testGetJobIdFromFlintIndexMetadata() { + @Test + void testGetJobIdFromFlintSkippingIndexMetadata() { URL url = Resources.getResource( - "flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json"); + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); String mappings = Resources.toString(url, Charsets.UTF_8); - String indexName = "flint_my_glue_default_http_logs_size_year_covering_index"; + String indexName = "flint_mys3_default_http_logs_skipping_index"; mockNodeClientIndicesMappings(indexName, mappings); FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( new IndexDetails( - "size_year", - new FullyQualifiedTableName("my_glue.default.http_logs"), + null, + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); + Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintCoveringIndexMetadata() { + URL url = + Resources.getResource("flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + String jobId = + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), false, true, FlintIndexType.COVERING)); - Assertions.assertEquals("00fdlum58g9g1g0q", jobId); + Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + } + + @SneakyThrows + @Test + void testGetJobIDWithNPEException() { + URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.COVERING))); + Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); + } + + @SneakyThrows + @Test + void testGetJobIdFromUnsupportedIndex() { + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.MATERIALIZED_VIEW))); + Assertions.assertEquals( + "Unsupported Index Type : MATERIALIZED_VIEW", unsupportedOperationException.getMessage()); } @SneakyThrows diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json deleted file mode 100644 index 201aa539bb..0000000000 --- a/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "mappings": { - "_meta": { - "kind": "skipping", - "indexedColumns": [ - { - "columnType": "int", - "kind": "VALUE_SET", - "columnName": "status" - } - ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" - } - } - }, - "properties": { - "file_path": { - "type": "keyword" - }, - "status": { - "type": "integer" - } - } - } -} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json new file mode 100644 index 0000000000..e7ca1ff440 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json @@ -0,0 +1,41 @@ +{ + "flint_mys3_default_http_logs_cv1_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_cv1_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json new file mode 100644 index 0000000000..24e14c12ba --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json @@ -0,0 +1,41 @@ +{ + "flint_mys3_default_http_logs_skipping_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_skipping_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/npe_mapping.json b/spark/src/test/resources/flint-index-mappings/npe_mapping.json new file mode 100644 index 0000000000..ff1d19f99f --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/npe_mapping.json @@ -0,0 +1,35 @@ +{ + "flint_mys3_default_http_logs_cv1_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_cv1_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0" + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file