Skip to content

Commit

Permalink
[POC] Streaming query scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Jun 24, 2024
1 parent b2403ca commit b58e24c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.RequiredArgsConstructor;
Expand All @@ -33,6 +36,8 @@
*/
@RequiredArgsConstructor
public class BatchQueryHandler extends AsyncQueryHandler {
private static final Logger LOG = LogManager.getLogger();

protected final EMRServerlessClient emrServerlessClient;
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
Expand Down Expand Up @@ -73,6 +78,12 @@ public DispatchQueryResponse submit(
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

// TODO: Hacky query write, should use query visitor
String query = dispatchQueryRequest.getQuery();
LOG.info("BatchQueryHandler - original query: " + query);
String newQuery = query.replace("auto_refresh = true", "auto_refresh = false");
LOG.info("BatchQueryHandler - newQuery: " + newQuery);

tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
Expand All @@ -83,7 +94,7 @@ public DispatchQueryResponse submit(
SparkSubmitParameters.builder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.query(dispatchQueryRequest.getQuery())
.query(newQuery)
.build()
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;

import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -77,13 +79,46 @@ private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery(
IndexQueryDetails indexQueryDetails) {
if (isEligibleForIndexDMLHandling(indexQueryDetails)) {
return queryHandlerFactory.getIndexDMLHandler();
} else if (isEligibleForStreamingQuery(indexQueryDetails)) {
return queryHandlerFactory.getStreamingQueryHandler();
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) {
// Create should be handled by batch handler. This is to avoid DROP index incorrectly cancel
// an interactive job.
return queryHandlerFactory.getBatchQueryHandler();
} else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) {
}

// Create with ( auto_refresh = true)
Boolean isCreate =
IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType());
if (isCreate) {
if (indexQueryDetails.getFlintIndexOptions().autoRefresh()) {
// Step 1: Set auto_refresh to false
indexQueryDetails.getFlintIndexOptions().setOption(AUTO_REFRESH, "false");
// Step 2: Register refresh job as query scheduler
// TODO: Register refresh job as query scheduler
}
}

// Alter
// with
// ( auto_refresh = false) - DML
// () - reject
// (auto_refresh = true) will not
Boolean isAlterQuery =
IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType());
if (isAlterQuery) {
Boolean has_auto_refresh =
indexQueryDetails.getFlintIndexOptions().getProvidedOptions().containsKey("auto_refresh");
if (has_auto_refresh) {
if (indexQueryDetails.getFlintIndexOptions().autoRefresh()) {
// Step 1: Set auto_refresh to false
indexQueryDetails.getFlintIndexOptions().setOption(AUTO_REFRESH, "false");
// Step 2: Register refresh job as query scheduler
// TODO: Register refresh job as query scheduler
} else {
// TODO: remove query scheduler
}
return queryHandlerFactory.getIndexDMLHandler();
} else {
throw new IllegalArgumentException("Alter should have auto_refresh");
}
}

if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) {
// Manual refresh should be handled by batch handler
return queryHandlerFactory.getRefreshQueryHandler();
} else {
Expand Down Expand Up @@ -117,13 +152,7 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
.getProvidedOptions()
.containsKey("auto_refresh")
&& !indexQueryDetails.getFlintIndexOptions().autoRefresh()));
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType());
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,31 +588,32 @@ void testDispatchMaterializedViewQuery() {
when(emrServerlessClientFactory.getClient()).thenReturn(emrServerlessClient);
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, MY_GLUE);
tags.put(INDEX_TAG_KEY, "flint_mv_1");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query =
"CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH"
+ " (auto_refresh = true)";
String actual =
"CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH"
+ " (auto_refresh = false)";
String sparkSubmitParameters =
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
},
query));
actual);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:streaming:flint_mv_1",
"TEST_CLUSTER:batch",
null,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
true,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
Expand Down

0 comments on commit b58e24c

Please sign in to comment.