Skip to content

Commit

Permalink
[Feature]Flint query scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 15, 2024
1 parent eb5c824 commit 110ab5c
Show file tree
Hide file tree
Showing 14 changed files with 593 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ configurations.all {
exclude group: "commons-logging", module: "commons-logging"
// enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
resolutionStrategy.force 'commons-codec:commons-codec:1.13'
resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre'
resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre'
}

// updateVersion: Task to auto increment to the next development iteration
Expand Down
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ repositories {

dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0'
Expand All @@ -46,7 +46,7 @@ dependencies {

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1'
testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
testImplementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation('org.junit.jupiter:junit-jupiter:5.9.3')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0'
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pitest {
}

dependencies {
api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0'
api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240'
Expand Down
2 changes: 1 addition & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ configurations.all {
resolutionStrategy.force "commons-logging:commons-logging:1.2"
// enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
resolutionStrategy.force 'commons-codec:commons-codec:1.13'
resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre'
resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre'
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
Expand Down
2 changes: 1 addition & 1 deletion legacy/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ dependencies {
because 'https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379'
}
}
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
implementation group: 'org.json', name: 'json', version:'20231013'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0'
Expand Down
32 changes: 9 additions & 23 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
id 'java'
id "io.freefair.lombok"
Expand All @@ -48,6 +29,7 @@ opensearchplugin {
name 'opensearch-sql'
description 'OpenSearch SQL'
classname 'org.opensearch.sql.plugin.SQLPlugin'
extendedPlugins = ['opensearch-job-scheduler']
licenseFile rootProject.file("LICENSE.txt")
noticeFile rootProject.file("NOTICE")
}
Expand Down Expand Up @@ -76,7 +58,7 @@ publishing {
}
repositories {
maven {
name = "Snapshots" // optional target repository name
name = "Snapshots"
url = "https://aws.oss.sonatype.org/content/repositories/snapshots"
credentials {
username "$System.env.SONATYPE_USERNAME"
Expand All @@ -98,7 +80,8 @@ configurations.all {
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
// enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
resolutionStrategy.force 'commons-codec:commons-codec:1.13'
resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre'
resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre'
resolutionStrategy.force 'com.google.guava:failureaccess:1.0.2'
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
Expand Down Expand Up @@ -139,6 +122,10 @@ spotless {
}

dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
compileOnly 'com.google.guava:guava:32.1.3-jre'
compileOnly 'com.google.guava:failureaccess:1.0.2'

api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
Expand Down Expand Up @@ -204,11 +191,10 @@ dependencyLicenses.enabled = false
// enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]"
testingConventions.enabled = false

// TODO: need to verify the thirdPartyAudi
// TODO: need to verify the thirdPartyAudit
// currently it complains missing classes like ibatis, mysql etc, should not be a problem
thirdPartyAudit.enabled = false


apply plugin: 'com.netflix.nebula.ospackage'
validateNebulaPom.enabled = false

Expand Down
87 changes: 86 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -39,9 +41,15 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
Expand Down Expand Up @@ -105,7 +113,9 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin {
public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension {

static final String JOB_INDEX_NAME = ".scheduler_sample_extension";

private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class);

Expand Down Expand Up @@ -141,6 +151,7 @@ public List<RestHandler> getRestHandlers(
Metrics.getInstance().registerDefaultMetrics();

return Arrays.asList(
new SampleExtensionRestHandler(),
new RestPPLQueryAction(),
new RestSqlAction(settings, injector),
new RestSqlStatsAction(settings, restController),
Expand Down Expand Up @@ -203,6 +214,11 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
SampleJobRunner jobRunner = SampleJobRunner.getJobRunnerInstance();
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);

this.clusterService = clusterService;
this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings());
this.client = (NodeClient) client;
Expand Down Expand Up @@ -243,6 +259,75 @@ public Collection<Object> createComponents(
pluginSettings);
}

@Override
public String getJobType() {
return "scheduler_sample_extension";
}

@Override
public String getJobIndex() {
return JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
return SampleJobRunner.getJobRunnerInstance();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
SampleJobParameter jobParameter = new SampleJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case SampleJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case SampleJobParameter.ENABLED_FILED:
jobParameter.setEnabled(parser.booleanValue());
break;
case SampleJobParameter.ENABLED_TIME_FILED:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case SampleJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case SampleJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
case SampleJobParameter.INDEX_NAME_FIELD:
jobParameter.setIndexToWatch(parser.text());
break;
case SampleJobParameter.LOCK_DURATION_SECONDS:
jobParameter.setLockDurationSeconds(parser.longValue());
break;
case SampleJobParameter.JITTER:
jobParameter.setJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return singletonList(
Expand Down
Loading

0 comments on commit 110ab5c

Please sign in to comment.