Skip to content

Commit

Permalink
Add feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 4, 2024
1 parent f9fe064 commit 943b013
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class SparkConstants {
"com.amazonaws.emr.AssumeRoleAWSCredentialsProvider";
public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/";
public static final String FLINT_JOB_QUERY = "spark.flint.job.query";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED =
"spark.flint.job.externalScheduler.enabled";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL =
"spark.flint.job.externalScheduler.interval";
public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex";
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer;
import org.opensearch.sql.spark.parameter.SparkSubmitParameters;

@RequiredArgsConstructor
public class OpenSearchAsyncQuerySchedulerConfigComposer implements GeneralSparkParameterComposer {
private final Settings settings;

@Override
public void compose(
SparkSubmitParameters sparkSubmitParameters,
DispatchQueryRequest dispatchQueryRequest,
AsyncQueryRequestContext context) {
String externalSchedulerEnabled =
settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED);
String externalSchedulerInterval =
settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL);
sparkSubmitParameters.setConfigItem(
FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled);
sparkSubmitParameters.setConfigItem(
FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider
collection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchAsyncQuerySchedulerConfigComposer(settings));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public enum Key {

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED(
"plugins.query.executionengine.async_query.external_scheduler.enabled"),
ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL(
"plugins.query.executionengine.async_query.external_scheduler.interval"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

Expand Down

0 comments on commit 943b013

Please sign in to comment.