Skip to content

Commit

Permalink
Pass extra parameters to Spark dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 5, 2023
1 parent b38d2a8 commit b67f6e2
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void legacySettingsShouldBeDeprecatedBeforeRemove() {
}

@Test
void default_spark_execution_engine_config_setting() {
void getDefaultSparkExecutionEngineConfigSetting() {
org.opensearch.common.settings.Settings settings =
org.opensearch.common.settings.Settings.builder()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getDatasource(),
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
clusterName.value()));
clusterName.value(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(jobId, sparkExecutionEngineConfig.getApplicationId()));
return new CreateAsyncQueryResponse(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;

/** Define Spark Submit Parameters. */
/**
* Define Spark Submit Parameters.
*/
@AllArgsConstructor
@RequiredArgsConstructor
public class SparkSubmitParameters {
public static final String SPACE = " ";
Expand All @@ -67,10 +71,16 @@ public class SparkSubmitParameters {
private final String className;
private final Map<String, String> config;

/**
* Extra parameters to append finally
*/
private String extraParameters;

public static class Builder {

private final String className;
private final Map<String, String> config;
private String extraParameters;

private Builder() {
className = DEFAULT_CLASS_NAME;
Expand Down Expand Up @@ -162,8 +172,13 @@ public Builder structuredStreaming(Boolean isStructuredStreaming) {
return this;
}

public Builder extraParameters(String params) {
extraParameters = params;
return this;
}

public SparkSubmitParameters build() {
return new SparkSubmitParameters(className, config);
return new SparkSubmitParameters(className, config, extraParameters);
}
}

Expand All @@ -180,6 +195,10 @@ public String toString() {
stringBuilder.append(config.get(key));
stringBuilder.append(SPACE);
}

if (extraParameters != null) {
stringBuilder.append(extraParameters);
}
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public class SparkExecutionEngineConfig {
private String region;
private String executionRoleARN;

/** Additional Spark submit parameters to append to request. */
private String sparkSubmitParameters;

public static SparkExecutionEngineConfig toSparkExecutionEngineConfig(String jsonString) {
return new Gson().fromJson(jsonString, SparkExecutionEngineConfig.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private StartJobRequest getStartJobRequestForNonIndexQueries(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
Expand Down Expand Up @@ -124,6 +125,7 @@ private StartJobRequest getStartJobRequestForIndexRequest(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.getAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,24 @@

package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

@AllArgsConstructor
@Data
@RequiredArgsConstructor // required explicitly
public class DispatchQueryRequest {
private final String applicationId;
private final String query;
private final String datasource;
private final LangType langType;
private final String executionRoleARN;
private final String clusterName;

/**
* Optional extra Spark submit parameters to include in final request
*/
private String extraSparkSubmitParams;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

public class SparkSubmitParametersTest {

@Test
public void testBuildWithoutExtraParameters() {
String params =
SparkSubmitParameters.Builder.builder()
.build()
.toString();

assertNotNull(params);
}

@Test
public void testBuildWithExtraParameters() {
String params =
SparkSubmitParameters.Builder.builder()
.extraParameters("--conf A=1")
.build()
.toString();

// Assert the conf is included with a space
assertTrue(params.contains(" --conf A=1"));
}
}

0 comments on commit b67f6e2

Please sign in to comment.