Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] Bug Fix , delete OpenSearch index when DROP INDEX #2252

Merged
merged 1 commit into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client));
new FlintIndexMetadataReaderImpl(client),
client);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
2 changes: 1 addition & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst();
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
Expand All @@ -28,6 +39,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
Expand All @@ -37,6 +49,8 @@
@AllArgsConstructor
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
Expand All @@ -53,6 +67,8 @@ public class SparkQueryDispatcher {

private FlintIndexMetadataReader flintIndexMetadataReader;

private Client client;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
return handleSQLQuery(dispatchQueryRequest);
Expand All @@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
// todo. refactor query process logic in plugin.
if (asyncQueryJobMetadata.isDropIndexQuery()) {
return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result();
}

// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
Expand Down Expand Up @@ -186,11 +207,29 @@ private DispatchQueryResponse handleDropIndexQuery(
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId);
String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
LOG.error("failed to delete index");
}
}
return new DispatchQueryResponse(
dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex());
new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex());
}

private static Map<String, String> getDefaultTagsForJobSubmission(
Expand All @@ -200,4 +239,39 @@ private static Map<String, String> getDefaultTagsForJobSubmission(
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

@Getter
@RequiredArgsConstructor
public static class DropIndexResult {
private static final int PREFIX_LEN = 10;

private final String status;

public static DropIndexResult fromJobId(String jobId) {
String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN);
return new DropIndexResult(status);
}

public String toJobId() {
String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status;
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
}

public JSONObject result() {
JSONObject result = new JSONObject();
if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) {
result.put(STATUS_FIELD, status);
// todo. refactor response handling.
JSONObject dummyData = new JSONObject();
dummyData.put("result", new JSONArray());
dummyData.put("schema", new JSONArray());
dummyData.put("applicationId", "fakeDropIndexApplicationId");
result.put(DATA_FIELD, dummyData);
} else {
result.put(STATUS_FIELD, status);
result.put(ERROR_FIELD, "failed to drop index");
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,38 @@ public class IndexDetails {
private Boolean autoRefresh = false;
private boolean isDropIndex;
private FlintIndexType indexType;

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import lombok.Data;

@Data
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

private final String jobId;
private final boolean autoRefresh;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
return new FlintIndexMetadata(jobId, autoRefresh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface FlintIndexMetadataReader {
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @return jobId.
* @return FlintIndexMetadata.
*/
String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,25 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

protected static final String META_KEY = "_meta";
protected static final String PROPERTIES_KEY = "properties";
protected static final String ENV_KEY = "env";
protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID";

private final Client client;

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails);
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName);
Map<String, Object> mappingSourceMap = mappingMetadata.getSourceAsMap();
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get(META_KEY);
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
return (String) envMap.get(JOB_ID_KEY);
return FlintIndexMetadata.fromMetatdata((Map<String, Object>) mappingSourceMap.get("_meta"));
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Provided Index doesn't exist");
}
}

private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

public class DropIndexResultTest {
// todo, remove this UT after response refactor.
@Test
public void successRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD));
assertEquals(
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}",
result.get(DATA_FIELD).toString());
}

// todo, remove this UT after response refactor.
@Test
public void failedRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD));
assertEquals("failed to drop index", result.get(ERROR_FIELD));
}
}
Loading