From f1f46c4eed57c186bd98d5587baff3be880c332a Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 4 Sep 2024 14:03:29 -0700 Subject: [PATCH] Add feature flag --- .../spark/data/constants/SparkConstants.java | 2 ++ ...archAsyncQuerySchedulerConfigComposer.java | 27 +++++++++++++++++++ .../config/AsyncExecutorServiceModule.java | 1 + .../sql/common/setting/Settings.java | 2 ++ 4 files changed, 32 insertions(+) create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index e87dbba03e..8fcf18975e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -86,6 +86,8 @@ public class SparkConstants { "com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"; public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/"; public static final String FLINT_JOB_QUERY = "spark.flint.job.query"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED = "spark.flint.job.externalScheduler.enabled"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL = "spark.flint.job.externalScheduler.interval"; public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex"; public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId"; diff --git a/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java new file mode 100644 index 0000000000..d223aa428f --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.config; + +import lombok.RequiredArgsConstructor; + +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer; +import org.opensearch.sql.spark.parameter.SparkSubmitParameters; + +@RequiredArgsConstructor +public class OpenSearchAsyncQuerySchedulerConfigComposer implements GeneralSparkParameterComposer { + private final Settings settings; + + @Override + public void compose(SparkSubmitParameters sparkSubmitParameters, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { + String externalSchedulerEnabled = settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED); + String externalSchedulerInterval = settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL); + sparkSubmitParameters.setConfigItem(FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled); + sparkSubmitParameters.setConfigItem(FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 52ffda483c..730d1a47e8 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -168,6 +168,7 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider collection.register( DataSourceType.SECURITY_LAKE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); + collection.register(new OpenSearchAsyncQuerySchedulerConfigComposer(settings)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index b6643f3209..7377d7cfe5 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -51,6 +51,8 @@ public enum Key { /** Async query Settings * */ ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED("plugins.query.executionengine.async_query.external_scheduler.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL("plugins.query.executionengine.async_query.external_scheduler.interval"), STREAMING_JOB_HOUSEKEEPER_INTERVAL( "plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");