Skip to content

Commit

Permalink
Merge branch '2.x' into backport/backport-3004-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Sep 17, 2024
2 parents e1961dc + 2bf7a90 commit b9a941d
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
Expand All @@ -13,10 +19,13 @@ public interface AsyncQueryScheduler {
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
Expand All @@ -26,10 +35,13 @@ public interface AsyncQueryScheduler {
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
Expand All @@ -41,8 +53,11 @@ public interface AsyncQueryScheduler {
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(String jobId);
void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
Expand All @@ -52,6 +67,9 @@ public interface AsyncQueryScheduler {
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(String jobId);
void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler).unscheduleJob(indexName);
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
}

@Test
Expand Down Expand Up @@ -318,8 +318,7 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

verify(asyncQueryScheduler).unscheduleJob(indexName);

verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
Expand All @@ -55,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {

@Override
/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
Expand Down Expand Up @@ -87,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Unschedules a job by marking it as disabled and updating its last update time. */
@Override
public void unscheduleJob(String jobId) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
try {
updateJob(request);
AsyncQuerySchedulerRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request, asyncQueryRequestContext);
LOG.info("Unscheduled job for jobId: {}", jobId);
} catch (IllegalStateException | DocumentMissingException e) {
LOG.error("Failed to unschedule job: {}", jobId, e);
Expand All @@ -105,7 +112,9 @@ public void unscheduleJob(String jobId) {
/** Updates an existing job with new parameters. */
@Override
@SneakyThrows
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
assertIndexExists();
Expand Down Expand Up @@ -134,8 +143,11 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Removes a job by deleting its document from the index. */
@Override
public void removeJob(String jobId) {
public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
assertIndexExists();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest
public static final String ENABLED_FIELD = "enabled";
private final Schedule schedule;

@Builder
@Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder")
public ScheduledAsyncQueryJobRequest(
String accountId,
String jobId,
Expand Down Expand Up @@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
AsyncQuerySchedulerRequest request) {
Instant updateTime =
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
return ScheduledAsyncQueryJobRequest.builder()
return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId(request.getAccountId())
.jobId(request.getJobId())
.dataSource(request.getDataSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti
public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder =
ScheduledAsyncQueryJobRequest.builder();
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down
Loading

0 comments on commit b9a941d

Please sign in to comment.