Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix , delete OpenSearch index when DROP INDEX #2250

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client));
new FlintIndexMetadataReaderImpl(client),
client);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
2 changes: 1 addition & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst();
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
Expand All @@ -28,6 +39,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
Expand All @@ -37,6 +49,8 @@
@AllArgsConstructor
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
Expand All @@ -53,6 +67,8 @@ public class SparkQueryDispatcher {

private FlintIndexMetadataReader flintIndexMetadataReader;

private Client client;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Not a hard requirement if possible]
Instead of having client in SparkQueryDispatcher: can we move all the Flint Index logic to One class.

For example: Change FlintIndexMetadataReader -> FlintIndexHanlder and handle deletion logic in that class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.
do it later. fix bug first.


public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
return handleSQLQuery(dispatchQueryRequest);
Expand All @@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
// todo. refactor query process logic in plugin.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using if ... else.. not a good solution, we need to add abstraction. e.g. QueryTracker.

if (asyncQueryJobMetadata.isDropIndexQuery()) {
return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result();
}

// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
Expand Down Expand Up @@ -186,11 +207,29 @@ private DispatchQueryResponse handleDropIndexQuery(
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId);
String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. I missed a lot of cases.

String status = JobRunState.FAILED.toString();
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
Copy link
Collaborator

@dai-chen dai-chen Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if this AppId is different from what's in metadata, we either fail here or assume job is already migrated to this App ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for open source, there is only one appid.
we should handle multiple appId cases in future.

}
} finally {
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
LOG.error("failed to delete index");
}
}
return new DispatchQueryResponse(
dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex());
new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex());
}

private static Map<String, String> getDefaultTagsForJobSubmission(
Expand All @@ -200,4 +239,39 @@ private static Map<String, String> getDefaultTagsForJobSubmission(
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

@Getter
@RequiredArgsConstructor
public static class DropIndexResult {
private static final int PREFIX_LEN = 10;

private final String status;

public static DropIndexResult fromJobId(String jobId) {
String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN);
return new DropIndexResult(status);
}

public String toJobId() {
String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status;
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
Copy link
Member

@vmmusings vmmusings Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the usual length of this result?
If it is more or less similar to emrJobId it would be good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SXlRbFhVczI1bEZBSUxFRA== similar

}

public JSONObject result() {
JSONObject result = new JSONObject();
if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) {
result.put(STATUS_FIELD, status);
// todo. refactor response handling.
JSONObject dummyData = new JSONObject();
dummyData.put("result", new JSONArray());
dummyData.put("schema", new JSONArray());
dummyData.put("applicationId", "fakeDropIndexApplicationId");
result.put(DATA_FIELD, dummyData);
} else {
result.put(STATUS_FIELD, status);
result.put(ERROR_FIELD, "failed to drop index");
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,38 @@ public class IndexDetails {
private Boolean autoRefresh = false;
private boolean isDropIndex;
private FlintIndexType indexType;

public String openSearchIndexName() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍

FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import lombok.Data;

@Data
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

private final String jobId;
private final boolean autoRefresh;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
return new FlintIndexMetadata(jobId, autoRefresh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface FlintIndexMetadataReader {
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @return jobId.
* @return FlintIndexMetadata.
*/
String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,25 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

protected static final String META_KEY = "_meta";
protected static final String PROPERTIES_KEY = "properties";
protected static final String ENV_KEY = "env";
protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID";

private final Client client;

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails);
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName);
Map<String, Object> mappingSourceMap = mappingMetadata.getSourceAsMap();
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get(META_KEY);
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
return (String) envMap.get(JOB_ID_KEY);
return FlintIndexMetadata.fromMetatdata((Map<String, Object>) mappingSourceMap.get("_meta"));
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Provided Index doesn't exist");
}
}

private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

public class DropIndexResultTest {
// todo, remove this UT after response refactor.
@Test
public void successRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD));
assertEquals(
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}",
result.get(DATA_FIELD).toString());
}

// todo, remove this UT after response refactor.
@Test
public void failedRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD));
assertEquals("failed to drop index", result.get(ERROR_FIELD));
}
}
Loading
Loading