Skip to content

Commit

Permalink
[Feature] Flint query scheduler part1 - integrate job scheduler plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 17, 2024
1 parent 0c2e1da commit 7127cda
Show file tree
Hide file tree
Showing 21 changed files with 815 additions and 35 deletions.
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
20 changes: 18 additions & 2 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand All @@ -68,6 +68,14 @@ singleStatement
: statement SEMICOLON* EOF
;

beginLabel
: multipartIdentifier COLON
;

endLabel
: multipartIdentifier
;

singleExpression
: namedExpression EOF
;
Expand Down Expand Up @@ -174,6 +182,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -853,13 +863,17 @@ identifierComment

relationPrimary
: identifierReference temporalClause?
sample? tableAlias #tableName
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;

optionsClause
: WITH options=propertyList
;

inlineTable
: VALUES expression (COMMA expression)* tableAlias
;
Expand Down Expand Up @@ -1572,6 +1586,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1920,6 +1935,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
2 changes: 2 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.exceptions.AsyncQuerySchedulerException;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class OpenSearchAsyncQueryScheduler {
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
"async-query-scheduler-index-mapping.yml";
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME =
"async-query-scheduler-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();
private Client client;
private ClusterService clusterService;

public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
}

public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
createAsyncQuerySchedulerIndex();
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
indexRequest.id(request.getName());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<IndexResponse> indexResponseActionFuture;
IndexResponse indexResponse;
try {
indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();
} catch (VersionConflictEngineException exception) {
throw new IllegalArgumentException("A job already exists with name: " + request.getName());
} catch (Exception e) {
throw new AsyncQuerySchedulerException(e);
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("Job : {} successfully created", request.getName());
} else {
throw new AsyncQuerySchedulerException(
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());
}
}

public void unscheduleJob(String jobId) throws IOException {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new IllegalArgumentException("Job index does not exist.");
}
OpenSearchRefreshIndexJobRequest request =
new OpenSearchRefreshIndexJobRequest.Builder()
.withJobName(jobId)
.withEnabled(false)
.withLastUpdateTime(Instant.now())
.build();
updateJob(request);
}

public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new IllegalArgumentException("Job index does not exist.");
}
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));

ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
UpdateResponse updateResponse = updateResponseActionFuture.actionGet();

if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
LOG.debug("Job : {} successfully updated", request.getName());
} else {
throw new AsyncQuerySchedulerException(
"Update job failed with result : " + updateResponse.getResult().getLowercase());
}
}

public void removeJob(String jobId) {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new IllegalArgumentException("Job index does not exist.");
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet();

if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
LOG.debug("Job : {} successfully deleted", jobId);
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
throw new AsyncQuerySchedulerException("Job : " + jobId + " doesn't exist");
} else {
throw new AsyncQuerySchedulerException(
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());
}
}

private void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
createIndexRequest.mapping(
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
createIndexRequest.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture =
client.admin().indices().create(createIndexRequest);
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();

if (createIndexResponse.isAcknowledged()) {
LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
} else {
throw new AsyncQuerySchedulerException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
throw new AsyncQuerySchedulerException(
"Internal server error while creating "
+ SCHEDULER_INDEX_NAME
+ " index: "
+ e.getMessage(),
e);
}
}

public static ScheduledJobRunner getJobRunner() {
return OpenSearchRefreshIndexJob.getJobRunnerInstance();
}

public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.Builder builder =
new OpenSearchRefreshIndexJobRequest.Builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
builder.withJobName(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD:
builder.withJobType(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
builder.withEnabled(parser.booleanValue());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
builder.withEnabledTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
builder.withLastUpdateTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
builder.withSchedule(ScheduleParser.parse(parser));
break;
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
builder.withLockDurationSeconds(parser.longValue());
break;
case OpenSearchRefreshIndexJobRequest.JITTER:
builder.withJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return builder.build();
};
}

private static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}
}
Loading

0 comments on commit 7127cda

Please sign in to comment.