Skip to content

Commit

Permalink
add concurrent limit on datasource and sessions
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 28, 2023
1 parent 88b1f03 commit c672470
Show file tree
Hide file tree
Showing 22 changed files with 596 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Key {
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
DATASOURCES_LIMIT("plugins.query.datasources.limit"),

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
Expand Down
3 changes: 2 additions & 1 deletion datasources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ repositories {
dependencies {
implementation project(':core')
implementation project(':protocol')
implementation project(':opensearch')
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"
Expand All @@ -35,7 +36,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
Expand All @@ -30,6 +31,7 @@ public class TransportCreateDataSourceAction
new ActionType<>(NAME, CreateDataSourceActionResponse::new);

private DataSourceService dataSourceService;
private org.opensearch.sql.opensearch.setting.OpenSearchSettings settings;

/**
* TransportCreateDataSourceAction action for creating datasource.
Expand All @@ -42,33 +44,44 @@ public class TransportCreateDataSourceAction
public TransportCreateDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
DataSourceServiceImpl dataSourceService,
org.opensearch.sql.opensearch.setting.OpenSearchSettings settings) {
super(
TransportCreateDataSourceAction.NAME,
transportService,
actionFilters,
CreateDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
this.settings = settings;
}

@Override
protected void doExecute(
Task task,
CreateDataSourceActionRequest request,
ActionListener<CreateDataSourceActionResponse> actionListener) {
try {
DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT);
if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) {
actionListener.onFailure(
new IllegalStateException(
String.format(
"domain concurrent datasources can not" + " exceed %d", dataSourceLimit)));
} else {
try {

DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package org.opensearch.sql.datasources.transport;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
Expand All @@ -21,6 +26,7 @@
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -29,9 +35,13 @@ public class TransportCreateDataSourceActionTest {

@Mock private TransportService transportService;
@Mock private TransportCreateDataSourceAction action;
@Mock private DataSourceServiceImpl dataSourceService;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DataSourceServiceImpl dataSourceService;

@Mock private Task task;
@Mock private ActionListener<CreateDataSourceActionResponse> actionListener;
@Mock private OpenSearchSettings settings;

@Captor
private ArgumentCaptor<CreateDataSourceActionResponse>
Expand All @@ -43,7 +53,9 @@ public class TransportCreateDataSourceActionTest {
public void setUp() {
action =
new TransportCreateDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20);
}

@Test
Expand Down Expand Up @@ -79,4 +91,30 @@ public void testDoExecuteWithException() {
Assertions.assertTrue(exception instanceof RuntimeException);
Assertions.assertEquals("Error", exception.getMessage());
}

@Test
public void testDataSourcesLimit() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("test_datasource");
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
CreateDataSourceActionRequest request = new CreateDataSourceActionRequest(dataSourceMetadata);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(1);

action.doExecute(
task,
request,
new ActionListener<CreateDataSourceActionResponse>() {
@Override
public void onResponse(CreateDataSourceActionResponse createDataSourceActionResponse) {
fail();
}

@Override
public void onFailure(Exception e) {
assertEquals("domain concurrent datasources can not exceed 1", e.getMessage());
}
});
verify(dataSourceService, times(0)).createDataSource(dataSourceMetadata);
}
}
37 changes: 35 additions & 2 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ SQL query::
}

plugins.query.executionengine.spark.session.limit
===================================================
==================================================

Description
-----------

Each datasource can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.
Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.

1. The default value is 100.
2. This setting is node scope.
Expand Down Expand Up @@ -383,3 +383,36 @@ SQL query::
}
}


plugins.query.datasources.limit
===============================

Description
-----------

Each cluster can have maximum 20 datasources. You can increase limit by this setting.

1. The default value is 20.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.datasources.limit":25}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"datasources": {
"limit": "25"
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -33,6 +34,11 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
}

@AfterClass
protected static void deleteDataSourcesCreated() throws IOException {
Request deleteRequest = getDeleteDataSourceRequest("create_prometheus");
Expand All @@ -50,6 +56,10 @@ protected static void deleteDataSourcesCreated() throws IOException {
deleteRequest = getDeleteDataSourceRequest("Create_Prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());

deleteRequest = getDeleteDataSourceRequest("duplicate_prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());
}

@SneakyThrows
Expand Down Expand Up @@ -278,4 +288,45 @@ public void issue2196() {
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password"));
Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription());
}

@Test
public void datasourceLimitTest() throws InterruptedException, IOException {
DataSourceMetadata d1 = mockDataSourceMetadata("duplicate_prometheus");
Request createRequest = getCreateDataSourceRequest(d1);
Response response = client().performRequest(createRequest);
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
// Datasource is not immediately created. so introducing a sleep of 2s.
Thread.sleep(2000);

updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.query.datasources.limit", "1"));

DataSourceMetadata d2 = mockDataSourceMetadata("d2");
ResponseException exception =
Assert.assertThrows(
ResponseException.class, () -> client().performRequest(getCreateDataSourceRequest(d2)));
Assert.assertEquals(400, exception.getResponse().getStatusLine().getStatusCode());
String prometheusGetResponseString = getResponseBody(exception.getResponse());
JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class);
Assert.assertEquals(
"domain concurrent datasources can not exceed 1",
errorMessage.get("error").getAsJsonObject().get("details").getAsString());
}

public DataSourceMetadata mockDataSourceMetadata(String name) {
return new DataSourceMetadata(
name,
"Prometheus Creation for Integ test",
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of(
"prometheus.uri",
"https://localhost:9090",
"prometheus.auth.type",
"basicauth",
"prometheus.auth.username",
"username",
"prometheus.auth.password",
"password"),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> DATASOURCES_LIMIT_SETTING =
Setting.intSetting(
Key.DATASOURCES_LIMIT.getKeyValue(),
20,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -231,6 +238,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.DATASOURCES_LIMIT,
DATASOURCES_LIMIT_SETTING,
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -298,6 +311,7 @@ public static List<Setting<?>> pluginSettings() {
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.build();
}

Expand Down
6 changes: 4 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
Expand Down Expand Up @@ -245,7 +246,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService, asyncQueryExecutorService);
return ImmutableList.of(dataSourceService, asyncQueryExecutorService, pluginSettings);
}

@Override
Expand Down Expand Up @@ -320,7 +321,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;

/** Process async query request. */
public abstract class AsyncQueryHandler {
Expand Down Expand Up @@ -45,5 +48,8 @@ protected abstract JSONObject getResponseFromResultIndex(
protected abstract JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata);

abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);
public abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);

public abstract DispatchQueryResponse submit(
DispatchQueryRequest request, DispatchQueryContext context);
}
Loading

0 comments on commit c672470

Please sign in to comment.