diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index de34803823..596d76c24b 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -62,7 +62,8 @@ void runOp( this.flintIndexMetadataService.updateIndexToManualRefresh( flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext); if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { - asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + asyncQueryScheduler.unscheduleJob( + flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext); } else { cancelStreamingJob(flintIndexStateModel); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index 3fa5423c10..88aca66fef 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -54,7 +54,8 @@ void runOp( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { - asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + asyncQueryScheduler.unscheduleJob( + flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext); } else { cancelStreamingJob(flintIndexStateModel); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java index 8ac499081e..6d5350821b 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java @@ -1,5 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.spark.scheduler; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; /** Scheduler interface for scheduling asynchronous query jobs. */ @@ -13,10 +19,13 @@ public interface AsyncQueryScheduler { * task * * @param asyncQuerySchedulerRequest The request containing job configuration details + * @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService * @throws IllegalArgumentException if a job with the same name already exists * @throws RuntimeException if there's an error during job creation */ - void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); + void scheduleJob( + AsyncQuerySchedulerRequest asyncQuerySchedulerRequest, + AsyncQueryRequestContext asyncQueryRequestContext); /** * Updates an existing job with new parameters. This method modifies the configuration of an @@ -26,10 +35,13 @@ public interface AsyncQueryScheduler { * scheduled job - Updating resource allocations for a job * * @param asyncQuerySchedulerRequest The request containing updated job configuration + * @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService * @throws IllegalArgumentException if the job to be updated doesn't exist * @throws RuntimeException if there's an error during the update process */ - void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); + void updateJob( + AsyncQuerySchedulerRequest asyncQuerySchedulerRequest, + AsyncQueryRequestContext asyncQueryRequestContext); /** * Unschedules a job by marking it as disabled and updating its last update time. This method is @@ -41,8 +53,11 @@ public interface AsyncQueryScheduler { * re-enabling of the job in the future * * @param jobId The unique identifier of the job to unschedule + * @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService + * @throws IllegalArgumentException if the job to be unscheduled doesn't exist + * @throws RuntimeException if there's an error during the unschedule process */ - void unscheduleJob(String jobId); + void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext); /** * Removes a job completely from the scheduler. This method permanently deletes the job and all @@ -52,6 +67,9 @@ public interface AsyncQueryScheduler { * created jobs - Freeing up resources by deleting unused job configurations * * @param jobId The unique identifier of the job to remove + * @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService + * @throws IllegalArgumentException if the job to be removed doesn't exist + * @throws RuntimeException if there's an error during the remove process */ - void removeJob(String jobId); + void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java index b54e5b30ce..c38d92365a 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java @@ -7,12 +7,14 @@ import java.time.Instant; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.opensearch.sql.spark.rest.model.LangType; /** Represents a job request for a scheduled task. */ @Data +@Builder @NoArgsConstructor @AllArgsConstructor public class AsyncQuerySchedulerRequest { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 52d805dd01..ddadeb65e2 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -230,7 +230,7 @@ public void createDropIndexQueryWithScheduler() { verifyCreateIndexDMLResultCalled(); verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); - verify(asyncQueryScheduler).unscheduleJob(indexName); + verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext); } @Test @@ -318,8 +318,7 @@ public void createAlterIndexQueryWithScheduler() { FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue(); assertFalse(flintIndexOptions.autoRefresh()); - verify(asyncQueryScheduler).unscheduleJob(indexName); - + verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext); verifyCreateIndexDMLResultCalled(); verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 9ebde4fe83..59bad14320 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -8,6 +8,7 @@ import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -35,6 +36,7 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; @@ -55,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler { @Override /** Schedules a new job by indexing it into the job index. */ - public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { + public void scheduleJob( + AsyncQuerySchedulerRequest asyncQuerySchedulerRequest, + AsyncQueryRequestContext asyncQueryRequestContext) { ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest); if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { @@ -87,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { /** Unschedules a job by marking it as disabled and updating its last update time. */ @Override - public void unscheduleJob(String jobId) { - ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() - .jobId(jobId) - .enabled(false) - .lastUpdateTime(Instant.now()) - .build(); + public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) { + if (Strings.isNullOrEmpty(jobId)) { + throw new IllegalArgumentException("JobId cannot be null or empty"); + } try { - updateJob(request); + AsyncQuerySchedulerRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(jobId) + .enabled(false) + .lastUpdateTime(Instant.now()) + .build(); + updateJob(request, asyncQueryRequestContext); LOG.info("Unscheduled job for jobId: {}", jobId); } catch (IllegalStateException | DocumentMissingException e) { LOG.error("Failed to unschedule job: {}", jobId, e); @@ -105,7 +112,9 @@ public void unscheduleJob(String jobId) { /** Updates an existing job with new parameters. */ @Override @SneakyThrows - public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { + public void updateJob( + AsyncQuerySchedulerRequest asyncQuerySchedulerRequest, + AsyncQueryRequestContext asyncQueryRequestContext) { ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest); assertIndexExists(); @@ -134,8 +143,11 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { /** Removes a job by deleting its document from the index. */ @Override - public void removeJob(String jobId) { + public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) { assertIndexExists(); + if (Strings.isNullOrEmpty(jobId)) { + throw new IllegalArgumentException("JobId cannot be null or empty"); + } DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); ActionFuture deleteResponseActionFuture = client.delete(deleteRequest); diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java index 9b85a11888..48aa52a3ce 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java @@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest public static final String ENABLED_FIELD = "enabled"; private final Schedule schedule; - @Builder + @Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder") public ScheduledAsyncQueryJobRequest( String accountId, String jobId, @@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest( AsyncQuerySchedulerRequest request) { Instant updateTime = request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now(); - return ScheduledAsyncQueryJobRequest.builder() + return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId(request.getAccountId()) .jobId(request.getJobId()) .dataSource(request.getDataSource()) diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java index 9e33ef0248..a824797066 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java @@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti public static ScheduledJobParser getJobParser() { return (parser, id, jobDocVersion) -> { ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder = - ScheduledAsyncQueryJobRequest.builder(); + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder(); XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_OBJECT, parser.nextToken(), parser); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java index a4a6eb6471..d6e672f7a2 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -16,7 +16,6 @@ import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; -import java.io.IOException; import java.time.Instant; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -43,6 +42,8 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; public class OpenSearchAsyncQuerySchedulerTest { @@ -57,6 +58,8 @@ public class OpenSearchAsyncQuerySchedulerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterService clusterService; + @Mock private AsyncQueryRequestContext context; + @Mock private ActionFuture indexResponseActionFuture; @Mock private ActionFuture updateResponseActionFuture; @@ -92,12 +95,12 @@ public void testScheduleJob() { when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); - scheduler.scheduleJob(request); + scheduler.scheduleJob(request, context); // Verify index created verify(client.admin().indices(), times(1)).create(ArgumentMatchers.any()); @@ -116,7 +119,7 @@ public void testScheduleJobWithExistingJob() { .thenReturn(Boolean.TRUE); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -127,7 +130,7 @@ public void testScheduleJobWithExistingJob() { assertThrows( IllegalArgumentException.class, () -> { - scheduler.scheduleJob(request); + scheduler.scheduleJob(request, context); }); verify(client, times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture()); @@ -145,24 +148,24 @@ public void testScheduleJobWithExceptions() { when(client.index(any(IndexRequest.class))).thenThrow(new RuntimeException("Test exception")); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); - assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request, context)); when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); RuntimeException exception = - assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request, context)); assertEquals("Schedule job failed with result : not_found", exception.getMessage()); } @Test - public void testUnscheduleJob() throws IOException { + public void testUnscheduleJob() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); @@ -170,7 +173,7 @@ public void testUnscheduleJob() throws IOException { when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); - scheduler.unscheduleJob(TEST_JOB_ID); + scheduler.unscheduleJob(TEST_JOB_ID, context); ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); verify(client).update(captor.capture()); @@ -183,7 +186,7 @@ public void testUnscheduleJob() throws IOException { captor = ArgumentCaptor.forClass(UpdateRequest.class); when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); - scheduler.unscheduleJob(TEST_JOB_ID); + scheduler.unscheduleJob(TEST_JOB_ID, context); verify(client, times(2)).update(captor.capture()); capturedRequest = captor.getValue(); @@ -191,20 +194,29 @@ public void testUnscheduleJob() throws IOException { assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); } + @Test + public void testUnscheduleJobInvalidJobId() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> scheduler.unscheduleJob("", context)); + assertEquals("JobId cannot be null or empty", exception.getMessage()); + } + @Test public void testUnscheduleJobWithIndexNotFound() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - scheduler.unscheduleJob(TEST_JOB_ID); + scheduler.unscheduleJob(TEST_JOB_ID, context); // Verify that no update operation was performed verify(client, never()).update(any(UpdateRequest.class)); } @Test - public void testUpdateJob() throws IOException { + public void testUpdateJob() { ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -216,7 +228,7 @@ public void testUpdateJob() throws IOException { when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); - scheduler.updateJob(request); + scheduler.updateJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); verify(client).update(captor.capture()); @@ -229,20 +241,20 @@ public void testUpdateJob() throws IOException { @Test public void testUpdateJobWithIndexNotFound() { ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - assertThrows(IllegalStateException.class, () -> scheduler.updateJob(request)); + assertThrows(IllegalStateException.class, () -> scheduler.updateJob(request, context)); } @Test public void testUpdateJobWithExceptions() { ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -255,7 +267,7 @@ public void testUpdateJobWithExceptions() { assertThrows( IllegalArgumentException.class, () -> { - scheduler.updateJob(request); + scheduler.updateJob(request, context); }); assertEquals("Job: testJob doesn't exist", exception1.getMessage()); @@ -266,7 +278,7 @@ public void testUpdateJobWithExceptions() { assertThrows( RuntimeException.class, () -> { - scheduler.updateJob(request); + scheduler.updateJob(request, context); }); assertEquals("java.lang.RuntimeException: Test exception", exception2.getMessage()); @@ -276,7 +288,7 @@ public void testUpdateJobWithExceptions() { when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); RuntimeException exception = - assertThrows(RuntimeException.class, () -> scheduler.updateJob(request)); + assertThrows(RuntimeException.class, () -> scheduler.updateJob(request, context)); assertEquals("Update job failed with result : not_found", exception.getMessage()); } @@ -290,7 +302,7 @@ public void testRemoveJob() { when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); - scheduler.removeJob(TEST_JOB_ID); + scheduler.removeJob(TEST_JOB_ID, context); ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); verify(client).delete(captor.capture()); @@ -304,7 +316,18 @@ public void testRemoveJob() { public void testRemoveJobWithIndexNotFound() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - assertThrows(IllegalStateException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + AsyncQuerySchedulerRequest request = + AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build(); + assertThrows(IllegalStateException.class, () -> scheduler.removeJob(TEST_JOB_ID, context)); + } + + @Test + public void testRemoveJobInvalidJobId() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> scheduler.removeJob("", context)); + assertEquals("JobId cannot be null or empty", exception.getMessage()); } @Test @@ -351,13 +374,14 @@ public void testCreateAsyncQuerySchedulerIndexFailure() { .thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME)); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); RuntimeException runtimeException = - Assertions.assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + Assertions.assertThrows( + RuntimeException.class, () -> scheduler.scheduleJob(request, context)); Assertions.assertEquals( "Internal server error while creating .async-query-scheduler index: Index creation is not" + " acknowledged.", @@ -367,7 +391,7 @@ public void testCreateAsyncQuerySchedulerIndexFailure() { @Test public void testUpdateJobNotFound() { ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -381,7 +405,7 @@ public void testUpdateJobNotFound() { assertThrows( IllegalArgumentException.class, () -> { - scheduler.updateJob(request); + scheduler.updateJob(request, context); }); assertEquals("Job: testJob doesn't exist", exception.getMessage()); @@ -401,7 +425,7 @@ public void testRemoveJobNotFound() { assertThrows( IllegalArgumentException.class, () -> { - scheduler.removeJob(TEST_JOB_ID); + scheduler.removeJob(TEST_JOB_ID, context); }); assertEquals("Job : testJob doesn't exist", exception.getMessage()); @@ -413,7 +437,7 @@ public void testRemoveJobWithExceptions() { when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception")); - assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID, context)); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); @@ -421,7 +445,8 @@ public void testRemoveJobWithExceptions() { when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); RuntimeException runtimeException = - Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + Assertions.assertThrows( + RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID, context)); Assertions.assertEquals("Remove job failed with result : noop", runtimeException.getMessage()); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java index cba8d43a2a..fdfb138ddb 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -72,7 +72,7 @@ public void testRunJobWithCorrectParameter() { spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId("testJob") .lastUpdateTime(Instant.now()) .lockDurationSeconds(10L) @@ -123,7 +123,7 @@ public void testDoRefreshThrowsException() { spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId("testJob") .lastUpdateTime(Instant.now()) .lockDurationSeconds(10L) @@ -158,7 +158,7 @@ public void testDoRefreshThrowsException() { @Test public void testRunJobWithUninitializedServices() { ScheduledAsyncQueryJobRequest jobParameter = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .jobId("testJob") .lastUpdateTime(Instant.now()) .build(); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java index 85d1948dc3..edf8379195 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java @@ -28,7 +28,7 @@ public void testBuilderAndGetterMethods() { IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); ScheduledAsyncQueryJobRequest jobRequest = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId("testAccount") .jobId("testJob") .dataSource("testDataSource") @@ -62,7 +62,7 @@ public void testToXContent() throws IOException { IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId("testAccount") .jobId("testJob") .dataSource("testDataSource") @@ -146,7 +146,7 @@ public void testEqualsAndHashCode() { IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); ScheduledAsyncQueryJobRequest request1 = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId("testAccount") .jobId("testJob") .dataSource("testDataSource") @@ -172,7 +172,7 @@ public void testEqualsAndHashCode() { assertTrue(toString.contains("jitter=0.1")); ScheduledAsyncQueryJobRequest request2 = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId("testAccount") .jobId("testJob") .dataSource("testDataSource") @@ -190,7 +190,7 @@ public void testEqualsAndHashCode() { assertEquals(request1.hashCode(), request2.hashCode()); ScheduledAsyncQueryJobRequest request3 = - ScheduledAsyncQueryJobRequest.builder() + ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder() .accountId("differentAccount") .jobId("testJob") .dataSource("testDataSource")