Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel Async Query API #2126

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public List<Route> routes() {
new Route(GET, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* PUT datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.UpdateDataSourceActionRequest]
Expand All @@ -100,8 +99,7 @@ public List<Route> routes() {
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* DELETE datasources
* Request body: Ref
* [org.opensearch.sql.plugin.transport.datasource.model.DeleteDataSourceActionRequest]
* Response body: Ref
Expand Down
27 changes: 17 additions & 10 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ We make use of default aws credentials chain to make calls to the emr serverless
have pass role permissions for emr-job-execution-role mentioned in the engine configuration.



Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_query/_async_query
HTTP URI: _plugins/_async_query
HTTP VERB: POST



Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
Expand All @@ -57,23 +54,19 @@ Sample Response::
"queryId": "00fd796ut1a7eg0q"
}


Async Query Result API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.


HTTP URI: _plugins/_query/_async_query/{queryId}
HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: GET


Sample Request BODY::

curl --location --request GET 'http://localhost:9200/_plugins/_async_query/00fd796ut1a7eg0q' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from default.http_logs limit 1"
}'

Sample Response if the Query is in Progress ::

Expand Down Expand Up @@ -106,3 +99,17 @@ Sample Response If the Query is successful ::
"total": 1,
"size": 1
}


Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: DELETE

Sample Request Body ::

curl --location --request DELETE 'http://localhost:9200/_plugins/_async_query/00fdalrvgkbh2g0q' \
--header 'Content-Type: application/json' \

Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ public interface AsyncQueryExecutorService {
* @return {@link AsyncQueryExecutionResponse}
*/
AsyncQueryExecutionResponse getAsyncQueryResults(String queryId);

/**
* Cancels running async query and returns the cancelled queryId.
*
* @param queryId queryId.
* @return {@link String} cancelledQueryId.
*/
String cancelQuery(String queryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
}

@Override
public String cancelQuery(String queryId) {
Optional<AsyncQueryJobMetadata> asyncQueryJobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (asyncQueryJobMetadata.isPresent()) {
return sparkQueryDispatcher.cancelJob(
asyncQueryJobMetadata.get().getApplicationId(), queryId);
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
}

private void validateSparkExecutionEngineSettings() {
if (!isSparkJobExecutionEnabled) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobDriver;
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -65,4 +68,21 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
logger.info("Job Run state: " + getJobRunResult.getJobRun().getState());
return getJobRunResult;
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
try {
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> emrServerless.cancelJobRun(cancelJobRunRequest));
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
} catch (ValidationException e) {
throw new IllegalArgumentException(
String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.spark.client;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;

public interface SparkJobClient {
Expand All @@ -19,4 +20,6 @@ String startJobRun(
String sparkSubmitParams);

GetJobRunResult getJobRunResult(String applicationId, String jobId);

CancelJobRunResult cancelJobRun(String applicationId, String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.net.URI;
Expand Down Expand Up @@ -64,6 +65,11 @@ public JSONObject getQueryResponse(String applicationId, String queryId) {
return result;
}

public String cancelJob(String applicationId, String jobId) {
CancelJobRunResult cancelJobRunResult = sparkJobClient.cancelJobRun(applicationId, jobId);
return cancelJobRunResult.getJobRunId();
}

// TODO: Analyze given query
// Extract datasourceName
// Apply Authorizaiton.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void onResponse(
CancelAsyncQueryActionResponse cancelAsyncQueryActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
RestStatus.NO_CONTENT,
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
"application/json; charset=UTF-8",
cancelAsyncQueryActionResponse.getResult()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionRequest;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.tasks.Task;
Expand All @@ -21,21 +22,31 @@ public class TransportCancelAsyncQueryRequestAction
extends HandledTransportAction<CancelAsyncQueryActionRequest, CancelAsyncQueryActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/async_query/delete";
private final AsyncQueryExecutorServiceImpl asyncQueryExecutorService;
public static final ActionType<CancelAsyncQueryActionResponse> ACTION_TYPE =
new ActionType<>(NAME, CancelAsyncQueryActionResponse::new);

@Inject
public TransportCancelAsyncQueryRequestAction(
TransportService transportService, ActionFilters actionFilters) {
TransportService transportService,
ActionFilters actionFilters,
AsyncQueryExecutorServiceImpl asyncQueryExecutorService) {
super(NAME, transportService, actionFilters, CancelAsyncQueryActionRequest::new);
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
protected void doExecute(
Task task,
CancelAsyncQueryActionRequest request,
ActionListener<CancelAsyncQueryActionResponse> listener) {
String responseContent = "deleted_job";
listener.onResponse(new CancelAsyncQueryActionResponse(responseContent));
try {
String jobId = asyncQueryExecutorService.cancelQuery(request.getQueryId());
listener.onResponse(
new CancelAsyncQueryActionResponse(
String.format("Deleted async query with id: %s", jobId)));
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

@AllArgsConstructor
@Getter
public class CancelAsyncQueryActionRequest extends ActionRequest {

private String queryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,34 @@ void testGetAsyncQueryResultsWithDisabledExecutionEngine() {
+ " to enable Async Query APIs",
illegalArgumentException.getMessage());
}

@Test
void testCancelJobWithJobNotFound() {
AsyncQueryExecutorService asyncQueryExecutorService =
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
when(asyncQueryJobMetadataStorageService.getJobMetadata(EMR_JOB_ID))
.thenReturn(Optional.empty());
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> asyncQueryExecutorService.cancelQuery(EMR_JOB_ID));
Assertions.assertEquals(
"QueryId: " + EMR_JOB_ID + " not found", asyncQueryNotFoundException.getMessage());
verifyNoInteractions(sparkQueryDispatcher);
verifyNoInteractions(settings);
}

@Test
void testCancelJob() {
AsyncQueryExecutorService asyncQueryExecutorService =
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
when(asyncQueryJobMetadataStorageService.getJobMetadata(EMR_JOB_ID))
.thenReturn(Optional.of(new AsyncQueryJobMetadata(EMR_JOB_ID, EMRS_APPLICATION_ID)));
when(sparkQueryDispatcher.cancelJob(EMRS_APPLICATION_ID, EMR_JOB_ID)).thenReturn(EMR_JOB_ID);
String jobId = asyncQueryExecutorService.cancelQuery(EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, jobId);
verifyNoInteractions(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
package org.opensearch.sql.spark.client;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_JOB_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID;
import static org.opensearch.sql.spark.constants.TestConstants.QUERY;
import static org.opensearch.sql.spark.constants.TestConstants.SPARK_SUBMIT_PARAMETERS;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -45,4 +50,28 @@ void testGetJobRunState() {
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
}

@Test
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID));
Assertions.assertEquals(
"Couldn't cancel the queryId: job-123xxx due to Error (Service: null; Status Code: 0; Error"
+ " Code: null; Request ID: null; Proxy: null)",
illegalArgumentException.getMessage());
}
}
Loading