diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 4fdd8335e1..d2c8c6ebb7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -306,7 +306,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() { this.dataSourceService, new DataSourceUserAuthorizationHelperImpl(client), jobExecutionResponseReader, - new FlintIndexMetadataReaderImpl(client)); + new FlintIndexMetadataReaderImpl(client), + client); return new AsyncQueryExecutorServiceImpl( asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings); } diff --git a/spark/build.gradle b/spark/build.gradle index eca0a3ad24..c06b5b6ecf 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -62,7 +62,7 @@ dependencies { test { useJUnitPlatform() testLogging { - events "passed", "skipped", "failed" + events "failed" exceptionFormat "full" } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryJobMetadataStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryJobMetadataStorageService.java index cee38d10f8..a95a6ffe45 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryJobMetadataStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryJobMetadataStorageService.java @@ -97,7 +97,8 @@ public Optional getJobMetadata(String jobId) { createJobMetadataIndex(); return Optional.empty(); } - return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst(); + return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream() + .findFirst(); } private void createJobMetadataIndex() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index aca4c86e0e..347e154885 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -12,11 +12,22 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRunState; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONArray; import org.json.JSONObject; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; @@ -28,6 +39,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -37,6 +49,8 @@ @AllArgsConstructor public class SparkQueryDispatcher { + private static final Logger LOG = LogManager.getLogger(); + public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; public static final String SCHEMA_TAG_KEY = "schema"; @@ -53,6 +67,8 @@ public class SparkQueryDispatcher { private FlintIndexMetadataReader flintIndexMetadataReader; + private Client client; + public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) { if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) { return handleSQLQuery(dispatchQueryRequest); @@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) } public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { + // todo. refactor query process logic in plugin. + if (asyncQueryJobMetadata.isDropIndexQuery()) { + return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result(); + } + // either empty json when the result is not available or data with status // Fetch from Result Index JSONObject result = @@ -186,11 +207,29 @@ private DispatchQueryResponse handleDropIndexQuery( DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); - String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails); - emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId); - String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16); + FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); + // if index is created without auto refresh. there is no job to cancel. + String status = JobRunState.FAILED.toString(); + try { + if (indexMetadata.isAutoRefresh()) { + emrServerlessClient.cancelJobRun( + dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); + } + } finally { + String indexName = indexDetails.openSearchIndexName(); + try { + AcknowledgedResponse response = + client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + if (!response.isAcknowledged()) { + LOG.error("failed to delete index"); + } + status = JobRunState.SUCCESS.toString(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("failed to delete index"); + } + } return new DispatchQueryResponse( - dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex()); + new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex()); } private static Map getDefaultTagsForJobSubmission( @@ -200,4 +239,39 @@ private static Map getDefaultTagsForJobSubmission( tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource()); return tags; } + + @Getter + @RequiredArgsConstructor + public static class DropIndexResult { + private static final int PREFIX_LEN = 10; + + private final String status; + + public static DropIndexResult fromJobId(String jobId) { + String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN); + return new DropIndexResult(status); + } + + public String toJobId() { + String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status; + return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8)); + } + + public JSONObject result() { + JSONObject result = new JSONObject(); + if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) { + result.put(STATUS_FIELD, status); + // todo. refactor response handling. + JSONObject dummyData = new JSONObject(); + dummyData.put("result", new JSONArray()); + dummyData.put("schema", new JSONArray()); + dummyData.put("applicationId", "fakeDropIndexApplicationId"); + result.put(DATA_FIELD, dummyData); + } else { + result.put(STATUS_FIELD, status); + result.put(ERROR_FIELD, "failed to drop index"); + } + return result; + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java index 2034535848..1cc66da9fc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java @@ -23,4 +23,38 @@ public class IndexDetails { private Boolean autoRefresh = false; private boolean isDropIndex; private FlintIndexType indexType; + + public String openSearchIndexName() { + FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName(); + if (FlintIndexType.SKIPPING.equals(getIndexType())) { + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + getIndexType().getSuffix(); + return indexName.toLowerCase(); + } else if (FlintIndexType.COVERING.equals(getIndexType())) { + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + getIndexName() + + "_" + + getIndexType().getSuffix(); + return indexName.toLowerCase(); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported Index Type : %s", getIndexType())); + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java new file mode 100644 index 0000000000..81b7fa1693 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import java.util.Locale; +import java.util.Map; +import lombok.Data; + +@Data +public class FlintIndexMetadata { + public static final String PROPERTIES_KEY = "properties"; + public static final String ENV_KEY = "env"; + public static final String OPTIONS_KEY = "options"; + + public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID"; + public static final String AUTO_REFRESH = "auto_refresh"; + public static final String AUTO_REFRESH_DEFAULT = "false"; + + private final String jobId; + private final boolean autoRefresh; + + public static FlintIndexMetadata fromMetatdata(Map metaMap) { + Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); + Map envMap = (Map) propertiesMap.get(ENV_KEY); + Map options = (Map) metaMap.get(OPTIONS_KEY); + String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); + + boolean autoRefresh = + !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) + .toLowerCase(Locale.ROOT) + .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); + return new FlintIndexMetadata(jobId, autoRefresh); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java index 7cb2e6a7c8..e4a5e92035 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java @@ -9,7 +9,7 @@ public interface FlintIndexMetadataReader { * Given Index details, get the streaming job Id. * * @param indexDetails indexDetails. - * @return jobId. + * @return FlintIndexMetadata. */ - String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails); + FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index d56c57a627..5f712e65cd 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -5,68 +5,25 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; /** Implementation of {@link FlintIndexMetadataReader} */ @AllArgsConstructor public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { - protected static final String META_KEY = "_meta"; - protected static final String PROPERTIES_KEY = "properties"; - protected static final String ENV_KEY = "env"; - protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID"; - private final Client client; @Override - public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) { - String indexName = getIndexName(indexDetails); + public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) { + String indexName = indexDetails.openSearchIndexName(); GetMappingsResponse mappingsResponse = client.admin().indices().prepareGetMappings(indexName).get(); try { MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName); Map mappingSourceMap = mappingMetadata.getSourceAsMap(); - Map metaMap = (Map) mappingSourceMap.get(META_KEY); - Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); - Map envMap = (Map) propertiesMap.get(ENV_KEY); - return (String) envMap.get(JOB_ID_KEY); + return FlintIndexMetadata.fromMetatdata((Map) mappingSourceMap.get("_meta")); } catch (NullPointerException npe) { throw new IllegalArgumentException("Provided Index doesn't exist"); } } - - private String getIndexName(IndexDetails indexDetails) { - FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); - if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) { - String indexName = - "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + indexDetails.getIndexType().getSuffix(); - return indexName.toLowerCase(); - } else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) { - String indexName = - "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + indexDetails.getIndexName() - + "_" - + indexDetails.getIndexType().getSuffix(); - return indexName.toLowerCase(); - } else { - throw new UnsupportedOperationException( - String.format("Unsupported Index Type : %s", indexDetails.getIndexType())); - } - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java new file mode 100644 index 0000000000..d1c26f52e0 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; + +import com.amazonaws.services.emrserverless.model.JobRunState; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class DropIndexResultTest { + // todo, remove this UT after response refactor. + @Test + public void successRespEncodeDecode() { + // encode jobId + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); + + // decode jobId + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); + + JSONObject result = dropIndexResult.result(); + assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD)); + assertEquals( + "{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}", + result.get(DATA_FIELD).toString()); + } + + // todo, remove this UT after response refactor. + @Test + public void failedRespEncodeDecode() { + // encode jobId + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId(); + + // decode jobId + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); + + JSONObject result = dropIndexResult.result(); + assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD)); + assertEquals("failed to drop index", result.get(ERROR_FIELD)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 7d97cc6c50..ab9761da36 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -6,8 +6,10 @@ package org.opensearch.sql.spark.dispatcher; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -32,14 +34,17 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import org.apache.commons.lang3.StringUtils; +import java.util.concurrent.ExecutionException; import org.json.JSONObject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -51,6 +56,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -65,6 +71,11 @@ public class SparkQueryDispatcherTest { @Mock private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; @Mock private FlintIndexMetadataReader flintIndexMetadataReader; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client openSearchClient; + + @Mock private FlintIndexMetadata flintIndexMetadata; + private SparkQueryDispatcher sparkQueryDispatcher; @BeforeEach @@ -75,7 +86,8 @@ void setUp() { dataSourceService, dataSourceUserAuthorizationHelper, jobExecutionResponseReader, - flintIndexMetadataReader); + flintIndexMetadataReader, + openSearchClient); } @Test @@ -573,7 +585,8 @@ void testGetQueryResponseWithSuccess() { dataSourceService, dataSourceUserAuthorizationHelper, jobExecutionResponseReader, - flintIndexMetadataReader); + flintIndexMetadataReader, + openSearchClient); JSONObject queryResult = new JSONObject(); Map resultMap = new HashMap<>(); resultMap.put(STATUS_FIELD, "SUCCESS"); @@ -600,17 +613,44 @@ void testGetQueryResponseWithSuccess() { verifyNoInteractions(emrServerlessClient); } + // todo. refactor query process logic in plugin. @Test - void testDropIndexQuery() { + void testGetQueryResponseOfDropIndex() { + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher( + emrServerlessClient, + dataSourceService, + dataSourceUserAuthorizationHelper, + jobExecutionResponseReader, + flintIndexMetadataReader, + openSearchClient); + + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); + + JSONObject result = + sparkQueryDispatcher.getQueryResponse( + new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, jobId, true, null)); + verify(jobExecutionResponseReader, times(0)) + .getResultFromOpensearchIndex(anyString(), anyString()); + Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); + } + + @Test + void testDropIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP INDEX size_year ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + when(flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( "size_year", new FullyQualifiedTableName("my_glue.default.http_logs"), false, true, FlintIndexType.COVERING))) - .thenReturn(EMR_JOB_ID); + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + // auto_refresh == true + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) .thenReturn( new CancelJobRunResult() @@ -619,6 +659,10 @@ void testDropIndexQuery() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + when(acknowledgedResponse.isAcknowledged()).thenReturn(true); DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -631,30 +675,34 @@ void testDropIndexQuery() { verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); verify(flintIndexMetadataReader, times(1)) - .getJobIdFromFlintIndexMetadata( + .getFlintIndexMetadata( new IndexDetails( "size_year", new FullyQualifiedTableName("my_glue.default.http_logs"), false, true, FlintIndexType.COVERING)); - Assertions.assertNotEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertTrue(StringUtils.isAlphanumeric(dispatchQueryResponse.getJobId())); - Assertions.assertEquals(16, dispatchQueryResponse.getJobId().length()); + + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); } @Test - void testDropSkippingIndexQuery() { + void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + when(flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( null, new FullyQualifiedTableName("my_glue.default.http_logs"), false, true, FlintIndexType.SKIPPING))) - .thenReturn(EMR_JOB_ID); + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) .thenReturn( new CancelJobRunResult() @@ -663,6 +711,9 @@ void testDropSkippingIndexQuery() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -675,16 +726,110 @@ void testDropSkippingIndexQuery() { verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); verify(flintIndexMetadataReader, times(1)) - .getJobIdFromFlintIndexMetadata( + .getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropSkippingIndexQueryAutoRefreshFalse() + throws ExecutionException, InterruptedException { + String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING))) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); + + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropSkippingIndexQueryDeleteIndexException() + throws ExecutionException, InterruptedException { + String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; + when(flintIndexMetadataReader.getFlintIndexMetadata( + new IndexDetails( + null, + new FullyQualifiedTableName("my_glue.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING))) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); + + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + when(openSearchClient.admin().indices().delete(any()).get()) + .thenThrow(ExecutionException.class); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)) + .getFlintIndexMetadata( new IndexDetails( null, new FullyQualifiedTableName("my_glue.default.http_logs"), false, true, FlintIndexType.SKIPPING)); - Assertions.assertNotEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertTrue(StringUtils.isAlphanumeric(dispatchQueryResponse.getJobId())); - Assertions.assertEquals(16, dispatchQueryResponse.getJobId().length()); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); + Assertions.assertEquals( + "{\"error\":\"failed to drop index\",\"status\":\"FAILED\"}", + dropIndexResult.result().toString()); Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 61fabe142a..b0c8491b0b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -42,15 +42,15 @@ void testGetJobIdFromFlintSkippingIndexMetadata() { String indexName = "flint_mys3_default_http_logs_skipping_index"; mockNodeClientIndicesMappings(indexName, mappings); FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - String jobId = - flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + FlintIndexMetadata indexMetadata = + flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( null, new FullyQualifiedTableName("mys3.default.http_logs"), false, true, FlintIndexType.SKIPPING)); - Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @SneakyThrows @@ -62,15 +62,15 @@ void testGetJobIdFromFlintCoveringIndexMetadata() { String indexName = "flint_mys3_default_http_logs_cv1_index"; mockNodeClientIndicesMappings(indexName, mappings); FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - String jobId = - flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + FlintIndexMetadata indexMetadata = + flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( "cv1", new FullyQualifiedTableName("mys3.default.http_logs"), false, true, FlintIndexType.COVERING)); - Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); } @SneakyThrows @@ -85,7 +85,7 @@ void testGetJobIDWithNPEException() { Assertions.assertThrows( IllegalArgumentException.class, () -> - flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( "cv1", new FullyQualifiedTableName("mys3.default.http_logs"), @@ -103,7 +103,7 @@ void testGetJobIdFromUnsupportedIndex() { Assertions.assertThrows( UnsupportedOperationException.class, () -> - flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + flintIndexMetadataReader.getFlintIndexMetadata( new IndexDetails( "cv1", new FullyQualifiedTableName("mys3.default.http_logs"), diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java new file mode 100644 index 0000000000..808b80766e --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.AUTO_REFRESH; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class FlintIndexMetadataTest { + + @Test + public void testAutoRefreshSetToTrue() { + FlintIndexMetadata indexMetadata = + FlintIndexMetadata.fromMetatdata( + new Metadata() + .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) + .addOptions(AUTO_REFRESH, "true") + .metadata()); + assertTrue(indexMetadata.isAutoRefresh()); + } + + @Test + public void testAutoRefreshSetToFalse() { + FlintIndexMetadata indexMetadata = + FlintIndexMetadata.fromMetatdata( + new Metadata() + .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) + .addOptions(AUTO_REFRESH, "false") + .metadata()); + assertFalse(indexMetadata.isAutoRefresh()); + } + + @Test + public void testWithOutAutoRefresh() { + FlintIndexMetadata indexMetadata = + FlintIndexMetadata.fromMetatdata( + new Metadata() + .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) + .addOptions(AUTO_REFRESH, "false") + .metadata()); + assertFalse(indexMetadata.isAutoRefresh()); + } + + static class Metadata { + private final Map properties; + private final Map env; + private final Map options; + + private Metadata() { + properties = new HashMap<>(); + env = new HashMap<>(); + options = new HashMap<>(); + } + + public Metadata addEnv(String key, String value) { + env.put(key, value); + return this; + } + + public Metadata addOptions(String key, String value) { + options.put(key, value); + return this; + } + + public Map metadata() { + Map result = new HashMap<>(); + properties.put(ENV_KEY, env); + result.put(OPTIONS_KEY, options); + result.put(PROPERTIES_KEY, properties); + return result; + } + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java new file mode 100644 index 0000000000..46fa4f7dbe --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; +import org.opensearch.sql.spark.dispatcher.model.IndexDetails; + +public class IndexDetailsTest { + @Test + public void skippingIndexName() { + assertEquals( + "flint_mys3_default_http_logs_skipping_index", + new IndexDetails( + "invalid", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING) + .openSearchIndexName()); + } +}