diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 9b82022d8f..43815a9904 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -87,6 +87,10 @@ public class SparkConstants { public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/"; public static final String FLINT_JOB_QUERY = "spark.flint.job.query"; public static final String FLINT_JOB_QUERY_ID = "spark.flint.job.queryId"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED = + "spark.flint.job.externalScheduler.enabled"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL = + "spark.flint.job.externalScheduler.interval"; public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex"; public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId"; diff --git a/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java new file mode 100644 index 0000000000..6dce09a406 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.config; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer; +import org.opensearch.sql.spark.parameter.SparkSubmitParameters; + +@RequiredArgsConstructor +public class OpenSearchAsyncQuerySchedulerConfigComposer implements GeneralSparkParameterComposer { + private final Settings settings; + + @Override + public void compose( + SparkSubmitParameters sparkSubmitParameters, + DispatchQueryRequest dispatchQueryRequest, + AsyncQueryRequestContext context) { + String externalSchedulerEnabled = + settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED); + String externalSchedulerInterval = + settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL); + sparkSubmitParameters.setConfigItem( + FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled); + sparkSubmitParameters.setConfigItem( + FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 9cc69b2fb7..cf39fe0ded 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -24,6 +24,7 @@ import org.opensearch.sql.spark.asyncquery.OpenSearchAsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.client.EMRServerlessClientFactoryImpl; +import org.opensearch.sql.spark.config.OpenSearchAsyncQuerySchedulerConfigComposer; import org.opensearch.sql.spark.config.OpenSearchExtraParameterComposer; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; @@ -164,6 +165,7 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider collection.register( DataSourceType.SECURITY_LAKE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); + collection.register(new OpenSearchAsyncQuerySchedulerConfigComposer(settings)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java new file mode 100644 index 0000000000..7836c63b7a --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java @@ -0,0 +1,68 @@ +package org.opensearch.sql.spark.config; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.parameter.SparkSubmitParameters; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchAsyncQuerySchedulerConfigComposerTest { + + @Mock private Settings settings; + @Mock private SparkSubmitParameters sparkSubmitParameters; + @Mock private DispatchQueryRequest dispatchQueryRequest; + @Mock private AsyncQueryRequestContext context; + + private OpenSearchAsyncQuerySchedulerConfigComposer composer; + + @BeforeEach + public void setUp() { + composer = new OpenSearchAsyncQuerySchedulerConfigComposer(settings); + } + + @Test + public void testCompose() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("true"); + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) + .thenReturn("10 minutes"); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.enabled", "true"); + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.interval", "10 minutes"); + } + + @Test + public void testComposeWithDisabledScheduler() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("false"); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.enabled", "false"); + } + + @Test + public void testComposeWithMissingInterval() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("true"); + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) + .thenReturn(""); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters).setConfigItem("spark.flint.job.externalScheduler.interval", ""); + } +} diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index b6643f3209..0037032d22 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -51,6 +51,10 @@ public enum Key { /** Async query Settings * */ ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED( + "plugins.query.executionengine.async_query.external_scheduler.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL( + "plugins.query.executionengine.async_query.external_scheduler.interval"), STREAMING_JOB_HOUSEKEEPER_INTERVAL( "plugins.query.executionengine.spark.streamingjobs.housekeeper.interval"); diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 236406e2c7..0789b6d330 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -639,6 +639,75 @@ Request:: } } +plugins.query.executionengine.async_query.external_scheduler.enabled +===================================================================== + +Description +----------- +This setting controls whether the external scheduler is enabled for async queries. + +* Default Value: true +* Scope: Node-level +* Dynamic Update: Yes, this setting can be updated dynamically. + +To disable the external scheduler, use the following command: + +Request :: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.enabled":"false"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "async_query": { + "external_scheduler": { + "enabled": "false" + } + } + } + } + } + } + } + +plugins.query.executionengine.async_query.external_scheduler.interval +===================================================================== + +Description +----------- +This setting specifies the interval at which the external scheduler runs to check for pending async queries. + +* Default Value: None (must be explicitly set) +* Format: A string representing a time duration follows Spark `CalendarInterval `__ format (e.g., ``10 minutes`` for 10 minutes, ``1 hour`` for 1 hour). + +To modify the interval to 10 minutes for example, use this command: + +Request :: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.interval":"10 minutes"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "async_query": { + "external_scheduler": { + "interval": "10 minutes" + } + } + } + } + } + } + } + plugins.query.executionengine.spark.streamingjobs.housekeeper.interval ====================================================================== diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 494b906b55..c91decce32 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -154,6 +154,19 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING = + Setting.boolSetting( + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING = + Setting.simpleString( + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL.getKeyValue(), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting SPARK_EXECUTION_ENGINE_CONFIG = Setting.simpleString( Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(), @@ -298,6 +311,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.ASYNC_QUERY_ENABLED, ASYNC_QUERY_ENABLED_SETTING, new Updater(Key.ASYNC_QUERY_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED, + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING, + new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL, + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING, + new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)); register( settingBuilder, clusterSettings, diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java index 84fb705ae0..026f0c6218 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java @@ -15,6 +15,8 @@ import static org.mockito.Mockito.when; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; import static org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings.legacySettings; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_INTERVAL_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_WINDOW_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.PPL_ENABLED_SETTING; @@ -195,4 +197,22 @@ void getSparkExecutionEngineConfigSetting() { .put(SPARK_EXECUTION_ENGINE_CONFIG.getKey(), sparkConfig) .build())); } + + @Test + void getAsyncQueryExternalSchedulerEnabledSetting() { + // Default is true + assertEquals( + true, + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING.get( + org.opensearch.common.settings.Settings.builder().build())); + } + + @Test + void getAsyncQueryExternalSchedulerIntervalSetting() { + // Default is empty string + assertEquals( + "", + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING.get( + org.opensearch.common.settings.Settings.builder().build())); + } }