diff --git a/async-query/build.gradle b/async-query/build.gradle index 52b1987fc4..53fdcbe292 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -100,7 +100,6 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.rest.*', 'org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser', - 'org.opensearch.sql.spark.scheduler.parser.IntervalScheduleParser', 'org.opensearch.sql.spark.transport.model.*' ] limit { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 6e50c60a96..9ebde4fe83 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -35,7 +35,7 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; @@ -196,6 +196,6 @@ private void assertIndexExists() { /** Returns the job runner instance for the scheduler. */ public static ScheduledJobRunner getJobRunner() { - return ScheduledAsyncQueryJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java similarity index 80% rename from async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java rename to async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java index 5fa3ff4752..1134c05039 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java @@ -33,12 +33,12 @@ * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler * plugin. */ -public class ScheduledAsyncQueryJob implements ScheduledJobRunner { - private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJob.class); +public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner { + private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); - public static ScheduledAsyncQueryJob INSTANCE = new ScheduledAsyncQueryJob(); + private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner(); - public static ScheduledAsyncQueryJob getJobRunnerInstance() { + public static ScheduledAsyncQueryJobRunner getJobRunnerInstance() { return INSTANCE; } @@ -47,7 +47,7 @@ public static ScheduledAsyncQueryJob getJobRunnerInstance() { private Client client; private AsyncQueryExecutorService asyncQueryExecutorService; - private ScheduledAsyncQueryJob() { + private ScheduledAsyncQueryJobRunner() { // Singleton class, use getJobRunnerInstance method instead of constructor } @@ -63,28 +63,12 @@ public void loadJobResource( this.asyncQueryExecutorService = asyncQueryExecutorService; } - public void setClusterService(ClusterService clusterService) { - this.clusterService = clusterService; - } - - public void setThreadPool(ThreadPool threadPool) { - this.threadPool = threadPool; - } - - public void setClient(Client client) { - this.client = client; - } - - public void setAsyncQueryExecutorService(AsyncQueryExecutorService asyncQueryExecutorService) { - this.asyncQueryExecutorService = asyncQueryExecutorService; - } - @Override public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { // Parser will convert jobParameter to ScheduledAsyncQueryJobRequest if (!(jobParameter instanceof ScheduledAsyncQueryJobRequest)) { throw new IllegalStateException( - "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + "Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: " + jobParameter.getClass().getCanonicalName()); } @@ -116,13 +100,10 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte } void doRefresh(ScheduledAsyncQueryJobRequest request) { - // TODO: use internal logic to create refresh index query? LOGGER.info("Scheduled refresh index job on job: " + request.getName()); - LOGGER.info("Scheduled refresh index job on query: " + request.getScheduledQuery()); CreateAsyncQueryRequest createAsyncQueryRequest = new CreateAsyncQueryRequest( request.getScheduledQuery(), request.getDataSource(), request.getQueryLang()); - LOGGER.info(asyncQueryExecutorService.getClass().getCanonicalName()); CreateAsyncQueryResponse createAsyncQueryResponse = asyncQueryExecutorService.createAsyncQuery( createAsyncQueryRequest, new NullAsyncQueryRequestContext()); diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java index 49e2cc2a55..9b85a11888 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java @@ -137,11 +137,6 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest( AsyncQuerySchedulerRequest request) { - // Validate that request.getSchedule() is a String - if (!(request.getSchedule() instanceof String)) { - throw new IllegalArgumentException("Schedule must be a String object."); - } - Instant updateTime = request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now(); return ScheduledAsyncQueryJobRequest.builder() @@ -155,10 +150,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest( .enabledTime(request.getEnabledTime()) .lockDurationSeconds(request.getLockDurationSeconds()) .jitter(request.getJitter()) - .schedule( - IntervalScheduleParser.parse( - (String) request.getSchedule(), - updateTime)) // This is specific to ScheduledAsyncQueryJobRequest + .schedule(IntervalScheduleParser.parse(request.getSchedule(), updateTime)) .build(); } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java index 6294a347af..2d5a1b332f 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java @@ -5,29 +5,43 @@ package org.opensearch.sql.spark.scheduler.parser; +import com.google.common.annotations.VisibleForTesting; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; /** Parse string raw schedule into job scheduler IntervalSchedule */ public class IntervalScheduleParser { private static final Pattern DURATION_PATTERN = Pattern.compile( - "^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|micros?|nanoseconds?|nanos?)$", + "^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|microseconds?|microsecond|micros?|micros|nanoseconds?|nanos?)$", Pattern.CASE_INSENSITIVE); - public static IntervalSchedule parse(String intervalStr, Instant startTime) { - intervalStr = intervalStr.trim().toLowerCase(); + public static Schedule parse(Object schedule, Instant startTime) { + if (schedule == null) { + return null; + } + + if (schedule instanceof Schedule) { + return (Schedule) schedule; + } + + if (!(schedule instanceof String)) { + throw new IllegalArgumentException("Schedule must be a String object for parsing."); + } + + String intervalStr = ((String) schedule).trim().toLowerCase(); Matcher matcher = DURATION_PATTERN.matcher(intervalStr); if (!matcher.matches()) { throw new IllegalArgumentException("Invalid interval format: " + intervalStr); } - int value = Integer.parseInt(matcher.group(1)); + long value = Long.parseLong(matcher.group(1)); String unitStr = matcher.group(2).toLowerCase(); // Convert to a supported unit or directly return an IntervalSchedule @@ -36,7 +50,8 @@ public static IntervalSchedule parse(String intervalStr, Instant startTime) { return new IntervalSchedule(startTime, (int) intervalInMinutes, ChronoUnit.MINUTES); } - private static long convertToSupportedUnit(int value, String unitStr) { + @VisibleForTesting + protected static long convertToSupportedUnit(long value, String unitStr) { switch (unitStr) { case "years": case "year": diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java similarity index 62% rename from async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java rename to async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java index ab40133a16..1c9c00df4e 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -10,11 +10,17 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.time.Instant; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Answers; @@ -32,7 +38,7 @@ import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; import org.opensearch.threadpool.ThreadPool; -public class ScheduledAsyncQueryJobTest { +public class ScheduledAsyncQueryJobRunnerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterService clusterService; @@ -48,27 +54,21 @@ public class ScheduledAsyncQueryJobTest { @Mock private JobExecutionContext context; - private ScheduledAsyncQueryJob jobRunner; + private ScheduledAsyncQueryJobRunner jobRunner; - private ScheduledAsyncQueryJob spyJobRunner; + private ScheduledAsyncQueryJobRunner spyJobRunner; @BeforeEach public void setup() { MockitoAnnotations.openMocks(this); - jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance(); - jobRunner.setClient(null); - jobRunner.setClusterService(null); - jobRunner.setThreadPool(null); - jobRunner.setAsyncQueryExecutorService(null); + jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + jobRunner.loadJobResource(null, null, null, null); } @Test public void testRunJobWithCorrectParameter() { spyJobRunner = spy(jobRunner); - spyJobRunner.setClusterService(clusterService); - spyJobRunner.setThreadPool(threadPool); - spyJobRunner.setClient(client); - spyJobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService); + spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.builder() @@ -98,11 +98,8 @@ public void testRunJobWithCorrectParameter() { @Test public void testRunJobWithIncorrectParameter() { - jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance(); - jobRunner.setClusterService(clusterService); - jobRunner.setThreadPool(threadPool); - jobRunner.setClient(client); - jobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService); + jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + jobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); @@ -113,11 +110,52 @@ public void testRunJobWithIncorrectParameter() { "Expected IllegalStateException but no exception was thrown"); assertEquals( - "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + "Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: " + wrongParameter.getClass().getCanonicalName(), exception.getMessage()); } + @Test + public void testDoRefreshThrowsException() { + spyJobRunner = spy(jobRunner); + spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + + // Create a ScheduledAsyncQueryJobRequest with valid parameters + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .scheduledQuery("REFRESH INDEX testIndex") + .dataSource("testDataSource") + .queryLang(LangType.SQL) + .build(); + + // Mock the doRefresh method to throw an exception + doThrow(new RuntimeException("Test exception")).when(spyJobRunner).doRefresh(request); + + // Capture the log output + Logger logger = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); + Appender mockAppender = mock(Appender.class); + when(mockAppender.getName()).thenReturn("MockAppender"); + when(mockAppender.isStarted()).thenReturn(true); + when(mockAppender.isStopped()).thenReturn(false); + ((org.apache.logging.log4j.core.Logger) logger) + .addAppender((org.apache.logging.log4j.core.Appender) mockAppender); + + spyJobRunner.runJob(request, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.generic()).submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + // Verify that the doRefresh method was called and the exception was logged + verify(spyJobRunner).doRefresh(eq(request)); + verify(mockAppender).append(any(LogEvent.class)); + } + @Test public void testRunJobWithUninitializedServices() { ScheduledAsyncQueryJobRequest jobParameter = @@ -133,7 +171,7 @@ public void testRunJobWithUninitializedServices() { "Expected IllegalStateException but no exception was thrown"); assertEquals("ClusterService is not initialized.", exception.getMessage()); - jobRunner.setClusterService(clusterService); + jobRunner.loadJobResource(null, clusterService, null, null); exception = assertThrows( @@ -142,7 +180,7 @@ public void testRunJobWithUninitializedServices() { "Expected IllegalStateException but no exception was thrown"); assertEquals("ThreadPool is not initialized.", exception.getMessage()); - jobRunner.setThreadPool(threadPool); + jobRunner.loadJobResource(null, clusterService, threadPool, null); exception = assertThrows( @@ -151,7 +189,7 @@ public void testRunJobWithUninitializedServices() { "Expected IllegalStateException but no exception was thrown"); assertEquals("Client is not initialized.", exception.getMessage()); - jobRunner.setClient(client); + jobRunner.loadJobResource(client, clusterService, threadPool, null); exception = assertThrows( @@ -163,9 +201,9 @@ public void testRunJobWithUninitializedServices() { @Test public void testGetJobRunnerInstanceMultipleCalls() { - ScheduledAsyncQueryJob instance1 = ScheduledAsyncQueryJob.getJobRunnerInstance(); - ScheduledAsyncQueryJob instance2 = ScheduledAsyncQueryJob.getJobRunnerInstance(); - ScheduledAsyncQueryJob instance3 = ScheduledAsyncQueryJob.getJobRunnerInstance(); + ScheduledAsyncQueryJobRunner instance1 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + ScheduledAsyncQueryJobRunner instance2 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + ScheduledAsyncQueryJobRunner instance3 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); assertSame(instance1, instance2); assertSame(instance2, instance3); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java index 136e2a662e..85d1948dc3 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java @@ -137,7 +137,7 @@ public void testFromAsyncQuerySchedulerRequestWithInvalidSchedule() { ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(request); }); - assertEquals("Schedule must be a String object.", exception.getMessage()); + assertEquals("Schedule must be a String object for parsing.", exception.getMessage()); } @Test @@ -162,7 +162,6 @@ public void testEqualsAndHashCode() { // Test toString String toString = request1.toString(); - System.out.println(toString); assertTrue(toString.contains("accountId=testAccount")); assertTrue(toString.contains("jobId=testJob")); assertTrue(toString.contains("dataSource=testDataSource")); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java new file mode 100644 index 0000000000..98150365eb --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.parser; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +public class IntervalScheduleParserTest { + + private Instant startTime; + + @BeforeEach + public void setup() { + startTime = Instant.now(); + } + + @Test + public void testParseValidScheduleString() { + String scheduleStr = "5 minutes"; + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 5, ChronoUnit.MINUTES), schedule); + } + + @Test + public void testParseValidScheduleStringWithDifferentUnits() { + String scheduleStr = "2 hours"; + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 120, ChronoUnit.MINUTES), schedule); + + scheduleStr = "1 day"; + schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 1440, ChronoUnit.MINUTES), schedule); + + scheduleStr = "3 weeks"; + schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 30240, ChronoUnit.MINUTES), schedule); + } + + @Test + public void testParseNullSchedule() { + Schedule schedule = IntervalScheduleParser.parse(null, startTime); + assertNull(schedule); + } + + @Test + public void testParseScheduleObject() { + IntervalSchedule expectedSchedule = new IntervalSchedule(startTime, 10, ChronoUnit.MINUTES); + Schedule schedule = IntervalScheduleParser.parse(expectedSchedule, startTime); + + assertEquals(expectedSchedule, schedule); + } + + @Test + public void testParseInvalidScheduleString() { + String scheduleStr = "invalid schedule"; + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse(scheduleStr, startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Invalid interval format: " + scheduleStr.toLowerCase(), exception.getMessage()); + } + + @Test + public void testParseUnsupportedUnits() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 year", startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Years cannot be converted to minutes accurately.", exception.getMessage()); + + exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 month", startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Months cannot be converted to minutes accurately.", exception.getMessage()); + } + + @Test + public void testParseNonStringSchedule() { + Object nonStringSchedule = 12345; + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse(nonStringSchedule, startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Schedule must be a String object for parsing.", exception.getMessage()); + } + + @Test + public void testParseScheduleWithNanoseconds() { + String scheduleStr = "60000000000 nanoseconds"; // Equivalent to 1 minute + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + } + + @Test + public void testParseScheduleWithMilliseconds() { + String scheduleStr = "60000 milliseconds"; // Equivalent to 1 minute + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + } + + @Test + public void testParseScheduleWithMicroseconds() { + String scheduleStr = "60000000 microseconds"; // Equivalent to 1 minute + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + } + + @Test + public void testUnsupportedTimeUnit() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Unsupported time unit: unsupportedunit", exception.getMessage()); + } + + @Test + public void testParseScheduleWithSeconds() { + String scheduleStr = "120 seconds"; // Equivalent to 2 minutes + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + + assertEquals(new IntervalSchedule(startTime, 2, ChronoUnit.MINUTES), schedule); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index e4bf255c41..560c5edadd 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -96,7 +96,7 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; -import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; import org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; @@ -247,7 +247,7 @@ public Collection createComponents( injector.getInstance(FlintIndexOpFactory.class)); AsyncQueryExecutorService asyncQueryExecutorService = injector.getInstance(AsyncQueryExecutorService.class); - ScheduledAsyncQueryJob.getJobRunnerInstance() + ScheduledAsyncQueryJobRunner.getJobRunnerInstance() .loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); return ImmutableList.of( @@ -266,7 +266,7 @@ public String getJobIndex() { @Override public ScheduledJobRunner getJobRunner() { - return ScheduledAsyncQueryJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); } @Override