Skip to content

Commit

Permalink
Bug Fix, delete opensearch index when DROP INDEX
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 6, 2023
1 parent 492982c commit 32a8759
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client));
new FlintIndexMetadataReaderImpl(client),
client);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
2 changes: 1 addition & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst();
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
Expand All @@ -28,6 +39,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
Expand All @@ -37,6 +49,8 @@
@AllArgsConstructor
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
Expand All @@ -53,6 +67,8 @@ public class SparkQueryDispatcher {

private FlintIndexMetadataReader flintIndexMetadataReader;

private Client client;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
return handleSQLQuery(dispatchQueryRequest);
Expand All @@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
// todo. refactor query process logic in plugin.
if (asyncQueryJobMetadata.isDropIndexQuery()) {
return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result();
}

// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
Expand Down Expand Up @@ -186,11 +207,33 @@ private DispatchQueryResponse handleDropIndexQuery(
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId);
String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16);
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String errorMsg = "E";
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
errorMsg = String.format("failed to delete index %s", indexName);
LOG.error(errorMsg);
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
errorMsg = String.format("failed to delete index %s", indexName);
LOG.error(errorMsg);
}
}
return new DispatchQueryResponse(
dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex());
new DropIndexResult(status, errorMsg).toJobId(), true, dataSourceMetadata.getResultIndex());
}

private static Map<String, String> getDefaultTagsForJobSubmission(
Expand All @@ -200,4 +243,44 @@ private static Map<String, String> getDefaultTagsForJobSubmission(
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

@Getter
@RequiredArgsConstructor
public static class DropIndexResult {
private static final int PREFIX_LEN = 4;
private static final String DELIMITER = ";";

private final String status;
private final String errorMsg;

public static DropIndexResult fromJobId(String jobId) {
String[] results =
new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN).split(DELIMITER);
return new DropIndexResult(results[0], results[1]);
}

public String toJobId() {
String queryId =
RandomStringUtils.randomAlphanumeric(PREFIX_LEN)
+ String.join(DELIMITER, status, errorMsg);
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
}

public JSONObject result() {
JSONObject result = new JSONObject();
if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) {
result.put(STATUS_FIELD, status);
// todo. refactor response handling.
JSONObject dummyData = new JSONObject();
dummyData.put("result", new JSONArray());
dummyData.put("schema", new JSONArray());
dummyData.put("applicationId", "fakeDropId");
result.put(DATA_FIELD, dummyData);
} else {
result.put(STATUS_FIELD, status);
result.put(ERROR_FIELD, errorMsg);
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,38 @@ public class IndexDetails {
private Boolean autoRefresh = false;
private boolean isDropIndex;
private FlintIndexType indexType;

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import lombok.Data;

@Data
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

private final String jobId;
private final boolean autoRefresh;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
return new FlintIndexMetadata(jobId, autoRefresh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ public interface FlintIndexMetadataReader {
* @param indexDetails indexDetails.
* @return jobId.
*/
String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
Expand All @@ -15,53 +14,16 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {
private final Client client;

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails);
public FlintIndexMetadata getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName);
Map<String, Object> mappingSourceMap = mappingMetadata.getSourceAsMap();
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get("_meta");
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get("properties");
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get("env");
return (String) envMap.get("SERVERLESS_EMR_JOB_ID");
return FlintIndexMetadata.fromMetatdata((Map<String, Object>) mappingSourceMap.get("_meta"));
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Index doesn't exist");
}
}

private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

public class DropIndexResultTest {
// todo, remove this UT after response refactor.
@Test
public void successRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString(), "E").toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD));
assertEquals(
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropId\"}",
result.get(DATA_FIELD).toString());
}

// todo, remove this UT after response refactor.
@Test
public void failedRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString(), "error").toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD));
assertEquals("error", result.get(ERROR_FIELD));
}
}
Loading

0 comments on commit 32a8759

Please sign in to comment.