diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java deleted file mode 100644 index 2377b2f5da..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.client; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; - -import com.amazonaws.services.emrserverless.AWSEMRServerless; -import com.amazonaws.services.emrserverless.model.CancelJobRunRequest; -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunRequest; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.amazonaws.services.emrserverless.model.JobDriver; -import com.amazonaws.services.emrserverless.model.SparkSubmit; -import com.amazonaws.services.emrserverless.model.StartJobRunRequest; -import com.amazonaws.services.emrserverless.model.StartJobRunResult; -import com.amazonaws.services.emrserverless.model.ValidationException; -import java.security.AccessController; -import java.security.PrivilegedAction; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class EmrServerlessClientImpl implements SparkJobClient { - - private final AWSEMRServerless emrServerless; - private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); - - public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { - this.emrServerless = emrServerless; - } - - @Override - public String startJobRun( - String query, - String jobName, - String applicationId, - String executionRoleArn, - String sparkSubmitParams) { - StartJobRunRequest request = - new StartJobRunRequest() - .withName(jobName) - .withApplicationId(applicationId) - .withExecutionRoleArn(executionRoleArn) - .withJobDriver( - new JobDriver() - .withSparkSubmit( - new SparkSubmit() - .withEntryPoint(SPARK_SQL_APPLICATION_JAR) - .withEntryPointArguments(query, SPARK_RESPONSE_BUFFER_INDEX_NAME) - .withSparkSubmitParameters(sparkSubmitParams))); - StartJobRunResult startJobRunResult = - AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.startJobRun(request)); - logger.info("Job Run ID: " + startJobRunResult.getJobRunId()); - return startJobRunResult.getJobRunId(); - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - GetJobRunRequest request = - new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobId); - GetJobRunResult getJobRunResult = - AccessController.doPrivileged( - (PrivilegedAction) () -> emrServerless.getJobRun(request)); - logger.info("Job Run state: " + getJobRunResult.getJobRun().getState()); - return getJobRunResult; - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - CancelJobRunRequest cancelJobRunRequest = - new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId); - try { - CancelJobRunResult cancelJobRunResult = - AccessController.doPrivileged( - (PrivilegedAction) - () -> emrServerless.cancelJobRun(cancelJobRunRequest)); - logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); - return cancelJobRunResult; - } catch (ValidationException e) { - throw new IllegalArgumentException( - String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage())); - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/SparkJobClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/SparkJobClient.java deleted file mode 100644 index c6b3059c77..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/client/SparkJobClient.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.spark.client; - -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; - -public interface SparkJobClient { - - String startJobRun( - String query, - String jobName, - String applicationId, - String executionRoleArn, - String sparkSubmitParams); - - GetJobRunResult getJobRunResult(String applicationId, String jobId); - - CancelJobRunResult cancelJobRun(String applicationId, String jobId); -}