-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug Fix , delete OpenSearch index when DROP INDEX #2250
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,11 +12,22 @@ | |
import com.amazonaws.services.emrserverless.model.CancelJobRunResult; | ||
import com.amazonaws.services.emrserverless.model.GetJobRunResult; | ||
import com.amazonaws.services.emrserverless.model.JobRunState; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Base64; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutionException; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.RequiredArgsConstructor; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.json.JSONArray; | ||
import org.json.JSONObject; | ||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.sql.datasource.DataSourceService; | ||
import org.opensearch.sql.datasource.model.DataSourceMetadata; | ||
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; | ||
|
@@ -28,6 +39,7 @@ | |
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; | ||
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; | ||
import org.opensearch.sql.spark.dispatcher.model.IndexDetails; | ||
import org.opensearch.sql.spark.flint.FlintIndexMetadata; | ||
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; | ||
import org.opensearch.sql.spark.response.JobExecutionResponseReader; | ||
import org.opensearch.sql.spark.rest.model.LangType; | ||
|
@@ -37,6 +49,8 @@ | |
@AllArgsConstructor | ||
public class SparkQueryDispatcher { | ||
|
||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
public static final String INDEX_TAG_KEY = "index"; | ||
public static final String DATASOURCE_TAG_KEY = "datasource"; | ||
public static final String SCHEMA_TAG_KEY = "schema"; | ||
|
@@ -53,6 +67,8 @@ public class SparkQueryDispatcher { | |
|
||
private FlintIndexMetadataReader flintIndexMetadataReader; | ||
|
||
private Client client; | ||
|
||
public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) { | ||
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) { | ||
return handleSQLQuery(dispatchQueryRequest); | ||
|
@@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) | |
} | ||
|
||
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { | ||
// todo. refactor query process logic in plugin. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using if ... else.. not a good solution, we need to add abstraction. e.g. QueryTracker. |
||
if (asyncQueryJobMetadata.isDropIndexQuery()) { | ||
return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result(); | ||
} | ||
|
||
// either empty json when the result is not available or data with status | ||
// Fetch from Result Index | ||
JSONObject result = | ||
|
@@ -186,11 +207,29 @@ private DispatchQueryResponse handleDropIndexQuery( | |
DataSourceMetadata dataSourceMetadata = | ||
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); | ||
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); | ||
String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails); | ||
emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId); | ||
String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16); | ||
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); | ||
// if index is created without auto refresh. there is no job to cancel. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for fixing this. I missed a lot of cases. |
||
String status = JobRunState.FAILED.toString(); | ||
try { | ||
if (indexMetadata.isAutoRefresh()) { | ||
emrServerlessClient.cancelJobRun( | ||
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so if this AppId is different from what's in metadata, we either fail here or assume job is already migrated to this App ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for open source, there is only one appid. |
||
} | ||
} finally { | ||
String indexName = indexDetails.openSearchIndexName(); | ||
try { | ||
AcknowledgedResponse response = | ||
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); | ||
if (!response.isAcknowledged()) { | ||
LOG.error("failed to delete index"); | ||
} | ||
status = JobRunState.SUCCESS.toString(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
LOG.error("failed to delete index"); | ||
} | ||
} | ||
return new DispatchQueryResponse( | ||
dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex()); | ||
new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex()); | ||
} | ||
|
||
private static Map<String, String> getDefaultTagsForJobSubmission( | ||
|
@@ -200,4 +239,39 @@ private static Map<String, String> getDefaultTagsForJobSubmission( | |
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource()); | ||
return tags; | ||
} | ||
|
||
@Getter | ||
@RequiredArgsConstructor | ||
public static class DropIndexResult { | ||
private static final int PREFIX_LEN = 10; | ||
|
||
private final String status; | ||
|
||
public static DropIndexResult fromJobId(String jobId) { | ||
String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN); | ||
return new DropIndexResult(status); | ||
} | ||
|
||
public String toJobId() { | ||
String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status; | ||
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the usual length of this result? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
public JSONObject result() { | ||
JSONObject result = new JSONObject(); | ||
if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) { | ||
result.put(STATUS_FIELD, status); | ||
// todo. refactor response handling. | ||
JSONObject dummyData = new JSONObject(); | ||
dummyData.put("result", new JSONArray()); | ||
dummyData.put("schema", new JSONArray()); | ||
dummyData.put("applicationId", "fakeDropIndexApplicationId"); | ||
result.put(DATA_FIELD, dummyData); | ||
} else { | ||
result.put(STATUS_FIELD, status); | ||
result.put(ERROR_FIELD, "failed to drop index"); | ||
} | ||
return result; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,4 +23,38 @@ public class IndexDetails { | |
private Boolean autoRefresh = false; | ||
private boolean isDropIndex; | ||
private FlintIndexType indexType; | ||
|
||
public String openSearchIndexName() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense 👍 |
||
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName(); | ||
if (FlintIndexType.SKIPPING.equals(getIndexType())) { | ||
String indexName = | ||
"flint" | ||
+ "_" | ||
+ fullyQualifiedTableName.getDatasourceName() | ||
+ "_" | ||
+ fullyQualifiedTableName.getSchemaName() | ||
+ "_" | ||
+ fullyQualifiedTableName.getTableName() | ||
+ "_" | ||
+ getIndexType().getSuffix(); | ||
return indexName.toLowerCase(); | ||
} else if (FlintIndexType.COVERING.equals(getIndexType())) { | ||
String indexName = | ||
"flint" | ||
+ "_" | ||
+ fullyQualifiedTableName.getDatasourceName() | ||
+ "_" | ||
+ fullyQualifiedTableName.getSchemaName() | ||
+ "_" | ||
+ fullyQualifiedTableName.getTableName() | ||
+ "_" | ||
+ getIndexName() | ||
+ "_" | ||
+ getIndexType().getSuffix(); | ||
return indexName.toLowerCase(); | ||
} else { | ||
throw new UnsupportedOperationException( | ||
String.format("Unsupported Index Type : %s", getIndexType())); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.flint; | ||
|
||
import java.util.Locale; | ||
import java.util.Map; | ||
import lombok.Data; | ||
|
||
@Data | ||
public class FlintIndexMetadata { | ||
public static final String PROPERTIES_KEY = "properties"; | ||
public static final String ENV_KEY = "env"; | ||
public static final String OPTIONS_KEY = "options"; | ||
|
||
public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID"; | ||
public static final String AUTO_REFRESH = "auto_refresh"; | ||
public static final String AUTO_REFRESH_DEFAULT = "false"; | ||
|
||
private final String jobId; | ||
private final boolean autoRefresh; | ||
|
||
public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) { | ||
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY); | ||
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY); | ||
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY); | ||
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); | ||
|
||
boolean autoRefresh = | ||
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) | ||
.toLowerCase(Locale.ROOT) | ||
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT); | ||
return new FlintIndexMetadata(jobId, autoRefresh); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.dispatcher; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; | ||
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; | ||
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; | ||
|
||
import com.amazonaws.services.emrserverless.model.JobRunState; | ||
import org.json.JSONObject; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class DropIndexResultTest { | ||
// todo, remove this UT after response refactor. | ||
@Test | ||
public void successRespEncodeDecode() { | ||
// encode jobId | ||
String jobId = | ||
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); | ||
|
||
// decode jobId | ||
SparkQueryDispatcher.DropIndexResult dropIndexResult = | ||
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); | ||
|
||
JSONObject result = dropIndexResult.result(); | ||
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD)); | ||
assertEquals( | ||
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}", | ||
result.get(DATA_FIELD).toString()); | ||
} | ||
|
||
// todo, remove this UT after response refactor. | ||
@Test | ||
public void failedRespEncodeDecode() { | ||
// encode jobId | ||
String jobId = | ||
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId(); | ||
|
||
// decode jobId | ||
SparkQueryDispatcher.DropIndexResult dropIndexResult = | ||
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); | ||
|
||
JSONObject result = dropIndexResult.result(); | ||
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD)); | ||
assertEquals("failed to drop index", result.get(ERROR_FIELD)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Not a hard requirement if possible]
Instead of having client in SparkQueryDispatcher: can we move all the Flint Index logic to One class.
For example: Change FlintIndexMetadataReader -> FlintIndexHanlder and handle deletion logic in that class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree.
do it later. fix bug first.