Skip to content

Commit

Permalink
Introduce scheduler service
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Jul 8, 2024
1 parent 4957d2a commit ad77992
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 96 deletions.
73 changes: 9 additions & 64 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
package org.opensearch.sql.plugin;

import static java.util.Collections.singletonList;
import static org.opensearch.jobscheduler.spi.LockModel.JOB_INDEX_NAME;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -39,14 +38,11 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
Expand Down Expand Up @@ -86,7 +82,7 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.*;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
Expand All @@ -108,8 +104,6 @@

public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension {

static final String JOB_INDEX_NAME = ".scheduler_sample_extension";

private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class);

private ClusterService clusterService;
Expand All @@ -119,6 +113,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Job

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private OpenSearchAsyncQuerySchedulingServiceImpl asyncQuerySchedulingService;

private Injector injector;

public String name() {
Expand Down Expand Up @@ -208,15 +204,11 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
SampleJobRunner jobRunner = SampleJobRunner.getJobRunnerInstance();
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);

this.clusterService = clusterService;
this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings());
this.client = (NodeClient) client;
this.dataSourceService = createDataSourceService();
this.asyncQuerySchedulingService = new OpenSearchAsyncQuerySchedulingServiceImpl(client, clusterService, threadPool);
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
Expand All @@ -231,6 +223,7 @@ public Collection<Object> createComponents(
});
modules.add(new AsyncExecutorServiceModule());
injector = modules.createInjector();
injector.getInstance(AsyncQuerySchedulingService.class);
ClusterManagerEventListener clusterManagerEventListener =
new ClusterManagerEventListener(
clusterService,
Expand Down Expand Up @@ -265,61 +258,13 @@ public String getJobIndex() {

@Override
public ScheduledJobRunner getJobRunner() {
return SampleJobRunner.getJobRunnerInstance();
// return SampleJobRunner.getJobRunnerInstance();
return asyncQuerySchedulingService.getJobRunner();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
SampleJobParameter jobParameter = new SampleJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case SampleJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case SampleJobParameter.ENABLED_FILED:
jobParameter.setEnabled(parser.booleanValue());
break;
case SampleJobParameter.ENABLED_TIME_FILED:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case SampleJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case SampleJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
case SampleJobParameter.INDEX_NAME_FIELD:
jobParameter.setIndexToWatch(parser.text());
break;
case SampleJobParameter.LOCK_DURATION_SECONDS:
jobParameter.setLockDurationSeconds(parser.longValue());
break;
case SampleJobParameter.JITTER:
jobParameter.setJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
return asyncQuerySchedulingService.getJobParser();
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ configurations {
generateGrammarSource.dependsOn downloadG4Files

dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

antlr "org.antlr:antlr4:4.7.1"

api project(':core')
Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ statement
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
| ALTER namespace identifierReference
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
| ALTER namespace identifierReference
UNSET (DBPROPERTIES | PROPERTIES) propertyList #unsetNamespaceProperties
| ALTER namespace identifierReference
SET locationSpec #setNamespaceLocation
| DROP namespace (IF EXISTS)? identifierReference
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.sql.spark.asyncquery;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;

public interface AsyncQueryScheduler {
public void scheduleQuery();

public void disableScheduledQuery();

public void cancelScheduledQuery();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.opensearch.sql.spark.asyncquery;

import java.io.IOException;
import java.time.Instant;
import lombok.RequiredArgsConstructor;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.AsyncQueryHandler;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.ThreadPool;

@RequiredArgsConstructor
public class OpenSearchAsyncQuerySchedulingServiceImpl implements AsyncQueryScheduler {
static final String JOB_INDEX_NAME = ".scheduler_sample_extension";
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;

public ScheduledJobRunner getJobRunner() {
SampleJobRunner sampleJobRunner = SampleJobRunner.getJobRunnerInstance();
sampleJobRunner.setClusterService(clusterService);
sampleJobRunner.setThreadPool(threadPool);
sampleJobRunner.setClient(client);
return sampleJobRunner;
}

public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
SampleJobParameter jobParameter = new SampleJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case SampleJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case SampleJobParameter.ENABLED_FILED:
jobParameter.setEnabled(parser.booleanValue());
break;
case SampleJobParameter.ENABLED_TIME_FILED:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case SampleJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case SampleJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
case SampleJobParameter.INDEX_NAME_FIELD:
jobParameter.setIndexToWatch(parser.text());
break;
case SampleJobParameter.LOCK_DURATION_SECONDS:
jobParameter.setLockDurationSeconds(parser.longValue());
break;
case SampleJobParameter.JITTER:
jobParameter.setJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

@Override
public void scheduleQuery() {
asyncQueryJobMetadataStorageService.storeJobMetadata();
}

@Override
public void disableScheduledQuery() {

}

@Override
public void cancelScheduledQuery() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sql.plugin;
package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.spark.asyncquery.OpenSearchAsyncQuerySchedulingServiceImpl.JOB_INDEX_NAME;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -29,8 +31,6 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;

import static org.opensearch.sql.plugin.SQLPlugin.JOB_INDEX_NAME;

/**
* A sample rest handler that supports schedule and deschedule job operation
*
Expand Down Expand Up @@ -122,8 +122,7 @@ public void onFailure(Exception e) {
} else if (request.method().equals(RestRequest.Method.DELETE)) {
// delete job parameter doc from index
String id = request.param("id");
DeleteRequest deleteRequest =
new DeleteRequest().index(JOB_INDEX_NAME).id(id);
DeleteRequest deleteRequest = new DeleteRequest().index(JOB_INDEX_NAME).id(id);

return restChannel -> {
client.delete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sql.plugin;
package org.opensearch.sql.spark.asyncquery;

import java.io.IOException;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sql.plugin;
package org.opensearch.sql.spark.asyncquery;

import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -111,27 +111,9 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
}

SampleJobParameter parameter = (SampleJobParameter) jobParameter;
StringBuilder msg = new StringBuilder();
msg.append("Watching index ")
.append(parameter.getIndexToWatch())
.append("\n");

List<ShardRouting> shardRoutingList =
this.clusterService
.state()
.routingTable()
.allShards(parameter.getIndexToWatch());
for (ShardRouting shardRouting : shardRoutingList) {
msg.append(shardRouting.shardId().getId())
.append("\t")
.append(shardRouting.currentNodeId())
.append("\t")
.append(shardRouting.active() ? "active" : "inactive")
.append("\n");
}
log.info(msg.toString());
runTaskForIntegrationTests(parameter);
runTaskForLockIntegrationTests(parameter);

// 1. Construct refresh query from the job parameter
// 2. Submit refresh query to EMR

lockService.release(
lock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.*;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.client.EMRServerlessClientFactoryImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
Expand All @@ -32,6 +30,7 @@
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.threadpool.ThreadPool;

@RequiredArgsConstructor
public class AsyncExecutorServiceModule extends AbstractModule {
Expand All @@ -56,6 +55,12 @@ public AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService(
return new OpensearchAsyncQueryJobMetadataStorageService(stateStore);
}

@Provides
@Singleton
public AsyncQuerySchedulingService asyncQuerySchedulingService(NodeClient client, ClusterService clusterService, ThreadPool threadPool) {
return new OpenSearchAsyncQuerySchedulingServiceImpl(client, clusterService, threadPool);
}

@Provides
@Singleton
public StateStore stateStore(NodeClient client, ClusterService clusterService) {
Expand Down

0 comments on commit ad77992

Please sign in to comment.