From b58e24ceec989dec0ac7097af6dc26c45c53a500 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 24 Jun 2024 16:40:46 -0700 Subject: [PATCH] [POC] Streaming query scheduler --- .../spark/dispatcher/BatchQueryHandler.java | 13 ++++- .../dispatcher/SparkQueryDispatcher.java | 57 ++++++++++++++----- .../dispatcher/SparkQueryDispatcherTest.java | 13 +++-- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 8014cf935f..a6c08eb3bc 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -10,6 +10,9 @@ import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.amazonaws.services.emrserverless.model.GetJobRunResult; import java.util.Map; import lombok.RequiredArgsConstructor; @@ -33,6 +36,8 @@ */ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { + private static final Logger LOG = LogManager.getLogger(); + protected final EMRServerlessClient emrServerlessClient; protected final JobExecutionResponseReader jobExecutionResponseReader; protected final LeaseManager leaseManager; @@ -73,6 +78,12 @@ public DispatchQueryResponse submit( Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); + // TODO: Hacky query write, should use query visitor + String query = dispatchQueryRequest.getQuery(); + LOG.info("BatchQueryHandler - original query: " + query); + String newQuery = query.replace("auto_refresh = true", "auto_refresh = false"); + LOG.info("BatchQueryHandler - newQuery: " + newQuery); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); StartJobRequest startJobRequest = new StartJobRequest( @@ -83,7 +94,7 @@ public DispatchQueryResponse submit( SparkSubmitParameters.builder() .clusterName(clusterName) .dataSource(context.getDataSourceMetadata()) - .query(dispatchQueryRequest.getQuery()) + .query(newQuery) .build() .acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()) .toString(), diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 5facdee567..6ae2e22e0f 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,6 +5,8 @@ package org.opensearch.sql.spark.dispatcher; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH; + import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; @@ -77,13 +79,46 @@ private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery( IndexQueryDetails indexQueryDetails) { if (isEligibleForIndexDMLHandling(indexQueryDetails)) { return queryHandlerFactory.getIndexDMLHandler(); - } else if (isEligibleForStreamingQuery(indexQueryDetails)) { - return queryHandlerFactory.getStreamingQueryHandler(); - } else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) { - // Create should be handled by batch handler. This is to avoid DROP index incorrectly cancel - // an interactive job. - return queryHandlerFactory.getBatchQueryHandler(); - } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { + } + + // Create with ( auto_refresh = true) + Boolean isCreate = + IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType()); + if (isCreate) { + if (indexQueryDetails.getFlintIndexOptions().autoRefresh()) { + // Step 1: Set auto_refresh to false + indexQueryDetails.getFlintIndexOptions().setOption(AUTO_REFRESH, "false"); + // Step 2: Register refresh job as query scheduler + // TODO: Register refresh job as query scheduler + } + } + + // Alter + // with + // ( auto_refresh = false) - DML + // () - reject + // (auto_refresh = true) will not + Boolean isAlterQuery = + IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType()); + if (isAlterQuery) { + Boolean has_auto_refresh = + indexQueryDetails.getFlintIndexOptions().getProvidedOptions().containsKey("auto_refresh"); + if (has_auto_refresh) { + if (indexQueryDetails.getFlintIndexOptions().autoRefresh()) { + // Step 1: Set auto_refresh to false + indexQueryDetails.getFlintIndexOptions().setOption(AUTO_REFRESH, "false"); + // Step 2: Register refresh job as query scheduler + // TODO: Register refresh job as query scheduler + } else { + // TODO: remove query scheduler + } + return queryHandlerFactory.getIndexDMLHandler(); + } else { + throw new IllegalArgumentException("Alter should have auto_refresh"); + } + } + + if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // Manual refresh should be handled by batch handler return queryHandlerFactory.getRefreshQueryHandler(); } else { @@ -117,13 +152,7 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails) private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) { return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType()) - || IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType()) - || (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType()) - && (indexQueryDetails - .getFlintIndexOptions() - .getProvidedOptions() - .containsKey("auto_refresh") - && !indexQueryDetails.getFlintIndexOptions().autoRefresh())); + || IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType()); } public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index d57284b9ca..00800dde0f 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -588,14 +588,15 @@ void testDispatchMaterializedViewQuery() { when(emrServerlessClientFactory.getClient()).thenReturn(emrServerlessClient); HashMap tags = new HashMap<>(); tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(INDEX_TAG_KEY, "flint_mv_1"); tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; + String actual = + "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + + " (auto_refresh = false)"; String sparkSubmitParameters = - withStructuredStreaming( constructExpectedSparkSubmitParameterString( "sigv4", new HashMap<>() { @@ -603,16 +604,16 @@ void testDispatchMaterializedViewQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }, - query)); + actual); StartJobRequest expected = new StartJobRequest( - "TEST_CLUSTER:streaming:flint_mv_1", + "TEST_CLUSTER:batch", null, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, sparkSubmitParameters, tags, - true, + false, "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();