Skip to content

Commit

Permalink
Refactor to Async Query API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 20, 2023
1 parent 90ccc3e commit 65cb970
Show file tree
Hide file tree
Showing 46 changed files with 708 additions and 879 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. highlight:: sh

=======================
Job Interface Endpoints
Async Query Interface Endpoints
=======================

.. rubric:: Table of contents
Expand All @@ -15,10 +15,10 @@ Introduction
============

For supporting `S3Glue <../ppl/admin/connector/s3glue_connector.rst>`_ and Cloudwatch datasources connectors, we have introduced a new execution engine on top of Spark.
All the queries to be executed on spark execution engine can only be submitted via Job APIs. Below sections will list all the new APIs introduced.
All the queries to be executed on spark execution engine can only be submitted via Async Query APIs. Below sections will list all the new APIs introduced.


Configuration required for Job APIs
Configuration required for Async Query APIs
======================================
Currently, we only support AWS emr serverless as SPARK execution engine. The details of execution engine should be configured under
``plugins.query.executionengine.spark.config`` cluster setting. The value should be a stringified json comprising of ``applicationId``, ``executionRoleARN``,``region``.
Expand All @@ -27,58 +27,59 @@ Sample Setting Value ::
plugins.query.executionengine.spark.config: '{"applicationId":"xxxxx", "executionRoleARN":"arn:aws:iam::***********:role/emr-job-execution-role","region":"eu-west-1"}'


If this setting is not configured during bootstrap, Job APIs will be disabled and it requires a cluster restart to enable them back again.
If this setting is not configured during bootstrap, Async Query APIs will be disabled and it requires a cluster restart to enable them back again.
We make use of default aws credentials chain to make calls to the emr serverless application and also make sure the default credentials
have pass role permissions for emr-job-execution-role mentioned in the engine configuration.



Job Creation API
Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/create``.
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_query/_jobs
HTTP URI: _plugins/_query/_async_query
HTTP VERB: POST



Sample Request::

curl --location 'http://localhost:9200/_plugins/_query/_jobs' \
curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"jobId": "00fd796ut1a7eg0q"
"queryId": "00fd796ut1a7eg0q"
}

Job Query Result API
Async Query Result API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/result``.
Job Creation and Result Query permissions are orthogonal, so any user with result api permissions and jobId can query the corresponding job results irrespective of the user who created the job.
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.


HTTP URI: _plugins/_query/_jobs/{jobId}/result
HTTP URI: _plugins/_query/_async_query/{queryId}
HTTP VERB: GET


Sample Request BODY::

curl --location --request GET 'http://localhost:9200/_plugins/_query/_jobs/00fd796ut1a7eg0q/result' \
curl --location --request GET 'http://localhost:9200/_plugins/_async_query/00fd796ut1a7eg0q' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from default.http_logs limit 1"
}'

Sample Response if the Job is in Progress ::
Sample Response if the Query is in Progress ::

{"status":"RUNNING"}

Sample Response If the Job is successful ::
Sample Response If the Query is successful ::

{
"schema": [
Expand Down
75 changes: 36 additions & 39 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,23 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.client.SparkJobClient;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobExecutorService;
import org.opensearch.sql.spark.jobs.JobExecutorServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
import org.opensearch.sql.spark.transport.TransportDeleteJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetQueryResultRequestAction;
import org.opensearch.sql.spark.transport.model.CreateJobActionResponse;
import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportGetAsyncQueryResultAction;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand All @@ -125,7 +123,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobExecutorService jobExecutorService;
private AsyncQueryExecutorService asyncQueryExecutorService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -158,7 +156,7 @@ public List<RestHandler> getRestHandlers(
new RestPPLStatsAction(settings, restController),
new RestQuerySettingsAction(settings, restController),
new RestDataSourceQueryAction(),
new RestJobManagementAction());
new RestAsyncQueryManagementAction());
}

/** Register action and handler so that transportClient can find proxy for action. */
Expand All @@ -184,18 +182,17 @@ public List<RestHandler> getRestHandlers(
TransportDeleteDataSourceAction.NAME, DeleteDataSourceActionResponse::new),
TransportDeleteDataSourceAction.class),
new ActionHandler<>(
new ActionType<>(TransportCreateJobRequestAction.NAME, CreateJobActionResponse::new),
TransportCreateJobRequestAction.class),
new ActionHandler<>(
new ActionType<>(TransportGetJobRequestAction.NAME, GetJobActionResponse::new),
TransportGetJobRequestAction.class),
new ActionType<>(
TransportCreateAsyncQueryRequestAction.NAME, CreateAsyncQueryActionResponse::new),
TransportCreateAsyncQueryRequestAction.class),
new ActionHandler<>(
new ActionType<>(
TransportGetQueryResultRequestAction.NAME, GetJobQueryResultActionResponse::new),
TransportGetQueryResultRequestAction.class),
TransportGetAsyncQueryResultAction.NAME, GetAsyncQueryResultActionResponse::new),
TransportGetAsyncQueryResultAction.class),
new ActionHandler<>(
new ActionType<>(TransportDeleteJobRequestAction.NAME, DeleteJobActionResponse::new),
TransportDeleteJobRequestAction.class));
new ActionType<>(
TransportCancelAsyncQueryRequestAction.NAME, CancelAsyncQueryActionResponse::new),
TransportCancelAsyncQueryRequestAction.class));
}

@Override
Expand All @@ -221,12 +218,12 @@ public Collection<Object> createComponents(
if (StringUtils.isEmpty(this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG))) {
LOGGER.warn(
String.format(
"Job APIs are disabled as %s is not configured in cluster settings. "
+ "Please configure and restart the domain to enable JobAPIs",
"Async Query APIs are disabled as %s is not configured in cluster settings. "
+ "Please configure and restart the domain to enable Async Query APIs",
SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue()));
this.jobExecutorService = new JobExecutorServiceImpl();
this.asyncQueryExecutorService = new AsyncQueryExecutorServiceImpl();
} else {
this.jobExecutorService = createJobExecutorService();
this.asyncQueryExecutorService = createAsyncQueryExecutorService();
}

ModulesBuilder modules = new ModulesBuilder();
Expand All @@ -239,7 +236,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService, jobExecutorService);
return ImmutableList.of(dataSourceService, asyncQueryExecutorService);
}

@Override
Expand Down Expand Up @@ -297,23 +294,23 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceUserAuthorizationHelper);
}

private JobExecutorService createJobExecutorService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
private AsyncQueryExecutorService createAsyncQueryExecutorService() {
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
SparkJobClient sparkJobClient = createEMRServerlessClient();
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
emrServerlessClient, this.dataSourceService, jobExecutionResponseReader);
return new JobExecutorServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
sparkJobClient, this.dataSourceService, jobExecutionResponseReader);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
private SparkJobClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
(PrivilegedAction<SparkJobClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
Expand Down
5 changes: 2 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.jobs.model.*',
'org.opensearch.sql.spark.jobs.config.*',
'org.opensearch.sql.spark.jobs.execution.*'
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;

/**
* AsyncQueryExecutorService exposes functionality to create, get results and cancel an async query.
*/
public interface AsyncQueryExecutorService {

/**
* Creates async query job based on the request and returns queryId in the response.
*
* @param createAsyncQueryRequest createAsyncQueryRequest.
* @return {@link CreateAsyncQueryResponse}
*/
CreateAsyncQueryResponse createAsyncQuery(CreateAsyncQueryRequest createAsyncQueryRequest);

/**
* Returns async query response for a given queryId.
*
* @param queryId queryId.
* @return {@link AsyncQueryExecutionResponse}
*/
AsyncQueryExecutionResponse getAsyncQueryResults(String queryId);
}
Loading

0 comments on commit 65cb970

Please sign in to comment.