diff --git a/build.gradle b/build.gradle index b3e09d7b50..d088883022 100644 --- a/build.gradle +++ b/build.gradle @@ -192,7 +192,7 @@ configurations.all { exclude group: "commons-logging", module: "commons-logging" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' } // updateVersion: Task to auto increment to the next development iteration diff --git a/common/build.gradle b/common/build.gradle index b4ee98a5b7..786629b027 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -34,7 +34,7 @@ repositories { dependencies { api "org.antlr:antlr4-runtime:4.7.1" - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' @@ -46,7 +46,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' - testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + testImplementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation('org.junit.jupiter:junit-jupiter:5.9.3') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0' diff --git a/core/build.gradle b/core/build.gradle index 655e7d92c2..9a2f08f148 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -46,7 +46,7 @@ pitest { } dependencies { - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 93153cf737..22b6d24005 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -153,7 +153,7 @@ configurations.all { resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" diff --git a/legacy/build.gradle b/legacy/build.gradle index 0467db183d..303387bdbe 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -107,7 +107,7 @@ dependencies { because 'https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379' } } - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' diff --git a/plugin/build.gradle b/plugin/build.gradle index 710d81ed0a..08c09ebb7c 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -3,25 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - plugins { id 'java' id "io.freefair.lombok" @@ -48,6 +29,7 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } @@ -76,7 +58,7 @@ publishing { } repositories { maven { - name = "Snapshots" // optional target repository name + name = "Snapshots" url = "https://aws.oss.sonatype.org/content/repositories/snapshots" credentials { username "$System.env.SONATYPE_USERNAME" @@ -98,7 +80,8 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' + resolutionStrategy.force 'com.google.guava:failureaccess:1.0.2' resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" @@ -139,6 +122,10 @@ spotless { } dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + compileOnly 'com.google.guava:guava:32.1.3-jre' + compileOnly 'com.google.guava:failureaccess:1.0.2' + api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" @@ -204,11 +191,10 @@ dependencyLicenses.enabled = false // enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]" testingConventions.enabled = false -// TODO: need to verify the thirdPartyAudi +// TODO: need to verify the thirdPartyAudit // currently it complains missing classes like ibatis, mysql etc, should not be a problem thirdPartyAudit.enabled = false - apply plugin: 'com.netflix.nebula.ospackage' validateNebulaPom.enabled = false diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index b86ab9218a..37b2006442 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -11,8 +11,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -39,9 +41,15 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; @@ -105,7 +113,9 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin { +public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { + + static final String JOB_INDEX_NAME = ".scheduler_sample_extension"; private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); @@ -141,6 +151,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( + new SampleExtensionRestHandler(), new RestPPLQueryAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), @@ -203,6 +214,11 @@ public Collection createComponents( NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameResolver, Supplier repositoriesServiceSupplier) { + SampleJobRunner jobRunner = SampleJobRunner.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; @@ -243,6 +259,75 @@ public Collection createComponents( pluginSettings); } + @Override + public String getJobType() { + return "scheduler_sample_extension"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return SampleJobRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + SampleJobParameter jobParameter = new SampleJobParameter(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case SampleJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case SampleJobParameter.ENABLED_FILED: + jobParameter.setEnabled(parser.booleanValue()); + break; + case SampleJobParameter.ENABLED_TIME_FILED: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case SampleJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case SampleJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + case SampleJobParameter.INDEX_NAME_FIELD: + jobParameter.setIndexToWatch(parser.text()); + break; + case SampleJobParameter.LOCK_DURATION_SECONDS: + jobParameter.setLockDurationSeconds(parser.longValue()); + break; + case SampleJobParameter.JITTER: + jobParameter.setJitter(parser.doubleValue()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } + @Override public List> getExecutorBuilders(Settings settings) { return singletonList( diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SampleExtensionRestHandler.java b/plugin/src/main/java/org/opensearch/sql/plugin/SampleExtensionRestHandler.java new file mode 100644 index 0000000000..d2ef842fa8 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SampleExtensionRestHandler.java @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sql.plugin; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; + +import static org.opensearch.sql.plugin.SQLPlugin.JOB_INDEX_NAME; + +/** + * A sample rest handler that supports schedule and deschedule job operation + * + *

Users need to provide "id", "index", "job_name", and "interval" parameter to schedule a job. + * e.g. {@code POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards + * index&index=.opensearch_dashboards_1&interval=1 } + * + *

creates a job with id "dashboards-job-id" and job name "watch dashboards index", which logs + * ".opensearch_dashboards_1" index's shards info every 1 minute + * + *

Users can remove that job by calling {@code DELETE + * /_plugins/scheduler_sample/watch?id=dashboards-job-id} + */ +public class SampleExtensionRestHandler extends BaseRestHandler { + public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch"; + + @Override + public String getName() { + return "Sample JobScheduler extension handler"; + } + + @Override + public List routes() { + return Collections.unmodifiableList( + Arrays.asList( + new Route(RestRequest.Method.POST, WATCH_INDEX_URI), + new Route(RestRequest.Method.DELETE, WATCH_INDEX_URI))); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) + throws IOException { + if (request.method().equals(RestRequest.Method.POST)) { + // compose SampleJobParameter object from request + String id = request.param("id"); + String indexName = request.param("index"); + String jobName = request.param("job_name"); + String interval = request.param("interval"); + String lockDurationSecondsString = request.param("lock_duration_seconds"); + Long lockDurationSeconds = + lockDurationSecondsString != null ? Long.parseLong(lockDurationSecondsString) : null; + String jitterString = request.param("jitter"); + Double jitter = jitterString != null ? Double.parseDouble(jitterString) : null; + + if (id == null || indexName == null) { + throw new IllegalArgumentException("Must specify id and index parameter"); + } + SampleJobParameter jobParameter = + new SampleJobParameter( + id, + jobName, + indexName, + new IntervalSchedule(Instant.now(), Integer.parseInt(interval), ChronoUnit.MINUTES), + lockDurationSeconds, + jitter); + IndexRequest indexRequest = + new IndexRequest() + .index(JOB_INDEX_NAME) + .id(id) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + return restChannel -> { + // index the job parameter + client.index( + indexRequest, + new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + try { + RestResponse restResponse = + new BytesRestResponse( + RestStatus.OK, + indexResponse.toXContent(JsonXContent.contentBuilder(), null)); + restChannel.sendResponse(restResponse); + } catch (IOException e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else if (request.method().equals(RestRequest.Method.DELETE)) { + // delete job parameter doc from index + String id = request.param("id"); + DeleteRequest deleteRequest = + new DeleteRequest().index(JOB_INDEX_NAME).id(id); + + return restChannel -> { + client.delete( + deleteRequest, + new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "Job deleted.")); + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else { + return restChannel -> { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, request.method() + " is not allowed.")); + }; + } + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobParameter.java b/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobParameter.java new file mode 100644 index 0000000000..987c5010f6 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobParameter.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sql.plugin; + +import java.io.IOException; +import java.time.Instant; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +/** + * A sample job parameter. + * + *

It adds an additional "indexToWatch" field to {@link ScheduledJobParameter}, which stores the + * index the job runner will watch. + */ +public class SampleJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + public static final String JITTER = "jitter"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + private String indexToWatch; + private Long lockDurationSeconds; + private Double jitter; + + public SampleJobParameter() {} + + public SampleJobParameter( + String id, + String name, + String indexToWatch, + Schedule schedule, + Long lockDurationSeconds, + Double jitter) { + this.jobName = name; + this.indexToWatch = indexToWatch; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + this.lockDurationSeconds = lockDurationSeconds; + this.jitter = jitter; + } + + @Override + public String getName() { + return this.jobName; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + @Override + public Long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + public String getIndexToWatch() { + return this.indexToWatch; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + } + + public void setLockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + } + + public void setJitter(Double jitter) { + this.jitter = jitter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder + .field(NAME_FIELD, this.jobName) + .field(ENABLED_FILED, this.isEnabled) + .field(SCHEDULE_FIELD, this.schedule) + .field(INDEX_NAME_FIELD, this.indexToWatch); + if (this.enabledTime != null) { + builder.timeField( + ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + this.lastUpdateTime.toEpochMilli()); + } + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobRunner.java b/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobRunner.java new file mode 100644 index 0000000000..07b8e73983 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SampleJobRunner.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sql.plugin; + +import java.util.List; +import java.util.UUID; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +/** + * A sample job runner class. + * + *

The job runner should be a singleton class if it uses OpenSearch client or other objects + * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, + * OpenSearch has not invoke plugins' createComponents() method. That is saying the plugin is not + * completely initalized, and the OpenSearch {@link org.opensearch.client.Client}, {@link + * ClusterService} and other objects are not available to plugin and this job runner. + * + *

So we have to move this job runner intialization to {@link Plugin} createComponents() method, + * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler + * plugin. + * + *

This sample job runner takes the "indexToWatch" from job parameter and logs that index's + * shards. + */ +public class SampleJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static SampleJobRunner INSTANCE; + + public static SampleJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (SampleJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new SampleJobRunner(); + return INSTANCE; + } + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private SampleJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof SampleJobParameter)) { + throw new IllegalStateException( + "Job parameter is not instance of SampleJobParameter, type: " + + jobParameter.getClass().getCanonicalName()); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = + () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock( + jobParameter, + context, + ActionListener.wrap( + lock -> { + if (lock == null) { + return; + } + + SampleJobParameter parameter = (SampleJobParameter) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ") + .append(parameter.getIndexToWatch()) + .append("\n"); + + List shardRoutingList = + this.clusterService + .state() + .routingTable() + .allShards(parameter.getIndexToWatch()); + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap( + released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, + exception -> { + throw new IllegalStateException("Failed to release lock."); + })); + }, + exception -> { + throw new IllegalStateException("Failed to acquire lock."); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(SampleJobParameter jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getIndexToWatch()) + .id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON)); + } + + private void runTaskForLockIntegrationTests(SampleJobParameter jobParameter) + throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); + } + } +} diff --git a/ppl/build.gradle b/ppl/build.gradle index d58882d5e8..39bed7a359 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -48,7 +48,7 @@ dependencies { runtimeOnly group: 'org.reflections', name: 'reflections', version: '0.9.12' implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.json', name: 'json', version: '20231013' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api project(':common') diff --git a/protocol/build.gradle b/protocol/build.gradle index 5bbff68e51..d706b480c3 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -30,7 +30,7 @@ plugins { } dependencies { - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" diff --git a/sql/build.gradle b/sql/build.gradle index 81872e6035..52ae0ac621 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -46,7 +46,7 @@ dependencies { antlr "org.antlr:antlr4:4.7.1" implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation project(':common') implementation project(':core') diff --git a/sql/src/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/sql/src/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000000..5337857c15 --- /dev/null +++ b/sql/src/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +org.opensearch.sql.plugin.SQLPlugin \ No newline at end of file