Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 19, 2023
1 parent a7af359 commit 90ccc3e
Show file tree
Hide file tree
Showing 46 changed files with 2,058 additions and 62 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information without any filtering.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,17 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
public DataSourceMetadata getDataSourceMetadata(String dataSourceName) {
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
removeAuthInfo(dataSourceMetadata);
return dataSourceMetadata;
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}

@Override
Expand Down Expand Up @@ -146,11 +134,20 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadataFromName(String dataSourceName) {
@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return DataSourceMetadata.defaultOpenSearchDataSourceMetadata();

} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
Optional<DataSourceMetadata> dataSourceMetadataOptional =
this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
return dataSourceMetadataOptional.get();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ void testRemovalOfAuthorizationInfo() {
@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
IllegalArgumentException exception =
DataSourceNotFoundException exception =
assertThrows(
IllegalArgumentException.class,
DataSourceNotFoundException.class,
() -> dataSourceService.getDataSourceMetadata("testDS"));
assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage());
assertEquals("DataSource with name testDS doesn't exist.", exception.getMessage());
}

@Test
Expand All @@ -385,4 +385,28 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
}

@Test
void testGetRawDataSourceMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "basicauth");
properties.put("prometheus.auth.username", "username");
properties.put("prometheus.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}
}
107 changes: 107 additions & 0 deletions docs/user/interfaces/jobinterface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
.. highlight:: sh

=======================
Job Interface Endpoints
=======================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

For supporting `S3Glue <../ppl/admin/connector/s3glue_connector.rst>`_ and Cloudwatch datasources connectors, we have introduced a new execution engine on top of Spark.
All the queries to be executed on spark execution engine can only be submitted via Job APIs. Below sections will list all the new APIs introduced.


Configuration required for Job APIs
======================================
Currently, we only support AWS emr serverless as SPARK execution engine. The details of execution engine should be configured under
``plugins.query.executionengine.spark.config`` cluster setting. The value should be a stringified json comprising of ``applicationId``, ``executionRoleARN``,``region``.
Sample Setting Value ::

plugins.query.executionengine.spark.config: '{"applicationId":"xxxxx", "executionRoleARN":"arn:aws:iam::***********:role/emr-job-execution-role","region":"eu-west-1"}'


If this setting is not configured during bootstrap, Job APIs will be disabled and it requires a cluster restart to enable them back again.
We make use of default aws credentials chain to make calls to the emr serverless application and also make sure the default credentials
have pass role permissions for emr-job-execution-role mentioned in the engine configuration.



Job Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/create``.

HTTP URI: _plugins/_query/_jobs
HTTP VERB: POST



Sample Request::

curl --location 'http://localhost:9200/_plugins/_query/_jobs' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"jobId": "00fd796ut1a7eg0q"
}

Job Query Result API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/result``.
Job Creation and Result Query permissions are orthogonal, so any user with result api permissions and jobId can query the corresponding job results irrespective of the user who created the job.


HTTP URI: _plugins/_query/_jobs/{jobId}/result
HTTP VERB: GET


Sample Request BODY::

curl --location --request GET 'http://localhost:9200/_plugins/_query/_jobs/00fd796ut1a7eg0q/result' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from default.http_logs limit 1"
}'

Sample Response if the Job is in Progress ::

{"status":"RUNNING"}

Sample Response If the Job is successful ::

{
"schema": [
{
"name": "indexed_col_name",
"type": "string"
},
{
"name": "data_type",
"type": "string"
},
{
"name": "skip_type",
"type": "string"
}
],
"datarows": [
[
"status",
"int",
"VALUE_SET"
]
],
"total": 1,
"size": 1
}
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ public void deleteDataSourceTest() {
Assert.assertThrows(
ResponseException.class, () -> client().performRequest(prometheusGetRequest));
Assert.assertEquals(
400, prometheusGetResponseException.getResponse().getStatusLine().getStatusCode());
404, prometheusGetResponseException.getResponse().getStatusLine().getStatusCode());
String prometheusGetResponseString =
getResponseBody(prometheusGetResponseException.getResponse());
JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class);
Assert.assertEquals(
"DataSource with name: delete_prometheus doesn't exist.",
"DataSource with name delete_prometheus doesn't exist.",
errorMessage.get("error").getAsJsonObject().get("details").getAsString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ dependencies {

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.5.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.5.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
}

Expand Down
Loading

0 comments on commit 90ccc3e

Please sign in to comment.