Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 3, 2024
1 parent 04ea971 commit d2c316b
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 71 deletions.
1 change: 0 additions & 1 deletion async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser',
'org.opensearch.sql.spark.scheduler.parser.IntervalScheduleParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;

Expand Down Expand Up @@ -196,6 +196,6 @@ private void assertIndexExists() {

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return ScheduledAsyncQueryJob.getJobRunnerInstance();
return ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
* and using singleton job runner to ensure we register a usable job runner instance to JobScheduler
* plugin.
*/
public class ScheduledAsyncQueryJob implements ScheduledJobRunner {
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJob.class);
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);

public static ScheduledAsyncQueryJob INSTANCE = new ScheduledAsyncQueryJob();
private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();

public static ScheduledAsyncQueryJob getJobRunnerInstance() {
public static ScheduledAsyncQueryJobRunner getJobRunnerInstance() {
return INSTANCE;
}

Expand All @@ -47,7 +47,7 @@ public static ScheduledAsyncQueryJob getJobRunnerInstance() {
private Client client;
private AsyncQueryExecutorService asyncQueryExecutorService;

private ScheduledAsyncQueryJob() {
private ScheduledAsyncQueryJobRunner() {
// Singleton class, use getJobRunnerInstance method instead of constructor
}

Expand All @@ -63,28 +63,12 @@ public void loadJobResource(
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

public void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
}

public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public void setClient(Client client) {
this.client = client;
}

public void setAsyncQueryExecutorService(AsyncQueryExecutorService asyncQueryExecutorService) {
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
// Parser will convert jobParameter to ScheduledAsyncQueryJobRequest
if (!(jobParameter instanceof ScheduledAsyncQueryJobRequest)) {
throw new IllegalStateException(
"Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: "
"Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: "
+ jobParameter.getClass().getCanonicalName());
}

Expand Down Expand Up @@ -116,13 +100,10 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
}

void doRefresh(ScheduledAsyncQueryJobRequest request) {
// TODO: use internal logic to create refresh index query?
LOGGER.info("Scheduled refresh index job on job: " + request.getName());
LOGGER.info("Scheduled refresh index job on query: " + request.getScheduledQuery());
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest(
request.getScheduledQuery(), request.getDataSource(), request.getQueryLang());
LOGGER.info(asyncQueryExecutorService.getClass().getCanonicalName());
CreateAsyncQueryResponse createAsyncQueryResponse =
asyncQueryExecutorService.createAsyncQuery(
createAsyncQueryRequest, new NullAsyncQueryRequestContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par

public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
AsyncQuerySchedulerRequest request) {
// Validate that request.getSchedule() is a String
if (!(request.getSchedule() instanceof String)) {
throw new IllegalArgumentException("Schedule must be a String object.");
}

Instant updateTime =
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
return ScheduledAsyncQueryJobRequest.builder()
Expand All @@ -155,10 +150,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
.enabledTime(request.getEnabledTime())
.lockDurationSeconds(request.getLockDurationSeconds())
.jitter(request.getJitter())
.schedule(
IntervalScheduleParser.parse(
(String) request.getSchedule(),
updateTime)) // This is specific to ScheduledAsyncQueryJobRequest
.schedule(IntervalScheduleParser.parse(request.getSchedule(), updateTime))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,43 @@

package org.opensearch.sql.spark.scheduler.parser;

import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;

/** Parse string raw schedule into job scheduler IntervalSchedule */
public class IntervalScheduleParser {

private static final Pattern DURATION_PATTERN =
Pattern.compile(
"^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|micros?|nanoseconds?|nanos?)$",
"^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|microseconds?|microsecond|micros?|micros|nanoseconds?|nanos?)$",
Pattern.CASE_INSENSITIVE);

public static IntervalSchedule parse(String intervalStr, Instant startTime) {
intervalStr = intervalStr.trim().toLowerCase();
public static Schedule parse(Object schedule, Instant startTime) {
if (schedule == null) {
return null;
}

if (schedule instanceof Schedule) {
return (Schedule) schedule;
}

if (!(schedule instanceof String)) {
throw new IllegalArgumentException("Schedule must be a String object for parsing.");
}

String intervalStr = ((String) schedule).trim().toLowerCase();

Matcher matcher = DURATION_PATTERN.matcher(intervalStr);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid interval format: " + intervalStr);
}

int value = Integer.parseInt(matcher.group(1));
long value = Long.parseLong(matcher.group(1));
String unitStr = matcher.group(2).toLowerCase();

// Convert to a supported unit or directly return an IntervalSchedule
Expand All @@ -36,7 +50,8 @@ public static IntervalSchedule parse(String intervalStr, Instant startTime) {
return new IntervalSchedule(startTime, (int) intervalInMinutes, ChronoUnit.MINUTES);
}

private static long convertToSupportedUnit(int value, String unitStr) {
@VisibleForTesting
protected static long convertToSupportedUnit(long value, String unitStr) {
switch (unitStr) {
case "years":
case "year":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
Expand All @@ -32,7 +38,7 @@
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class ScheduledAsyncQueryJobTest {
public class ScheduledAsyncQueryJobRunnerTest {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ClusterService clusterService;
Expand All @@ -48,27 +54,21 @@ public class ScheduledAsyncQueryJobTest {

@Mock private JobExecutionContext context;

private ScheduledAsyncQueryJob jobRunner;
private ScheduledAsyncQueryJobRunner jobRunner;

private ScheduledAsyncQueryJob spyJobRunner;
private ScheduledAsyncQueryJobRunner spyJobRunner;

@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance();
jobRunner.setClient(null);
jobRunner.setClusterService(null);
jobRunner.setThreadPool(null);
jobRunner.setAsyncQueryExecutorService(null);
jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
jobRunner.loadJobResource(null, null, null, null);
}

@Test
public void testRunJobWithCorrectParameter() {
spyJobRunner = spy(jobRunner);
spyJobRunner.setClusterService(clusterService);
spyJobRunner.setThreadPool(threadPool);
spyJobRunner.setClient(client);
spyJobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);
spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
Expand Down Expand Up @@ -98,11 +98,8 @@ public void testRunJobWithCorrectParameter() {

@Test
public void testRunJobWithIncorrectParameter() {
jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance();
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);
jobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);
jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
jobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class);

Expand All @@ -113,11 +110,52 @@ public void testRunJobWithIncorrectParameter() {
"Expected IllegalStateException but no exception was thrown");

assertEquals(
"Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: "
"Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: "
+ wrongParameter.getClass().getCanonicalName(),
exception.getMessage());
}

@Test
public void testDoRefreshThrowsException() {
spyJobRunner = spy(jobRunner);
spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

// Create a ScheduledAsyncQueryJobRequest with valid parameters
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.lockDurationSeconds(10L)
.scheduledQuery("REFRESH INDEX testIndex")
.dataSource("testDataSource")
.queryLang(LangType.SQL)
.build();

// Mock the doRefresh method to throw an exception
doThrow(new RuntimeException("Test exception")).when(spyJobRunner).doRefresh(request);

// Capture the log output
Logger logger = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
Appender mockAppender = mock(Appender.class);
when(mockAppender.getName()).thenReturn("MockAppender");
when(mockAppender.isStarted()).thenReturn(true);
when(mockAppender.isStopped()).thenReturn(false);
((org.apache.logging.log4j.core.Logger) logger)
.addAppender((org.apache.logging.log4j.core.Appender) mockAppender);

spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.generic()).submit(captor.capture());

Runnable runnable = captor.getValue();
runnable.run();

// Verify that the doRefresh method was called and the exception was logged
verify(spyJobRunner).doRefresh(eq(request));
verify(mockAppender).append(any(LogEvent.class));
}

@Test
public void testRunJobWithUninitializedServices() {
ScheduledAsyncQueryJobRequest jobParameter =
Expand All @@ -133,7 +171,7 @@ public void testRunJobWithUninitializedServices() {
"Expected IllegalStateException but no exception was thrown");
assertEquals("ClusterService is not initialized.", exception.getMessage());

jobRunner.setClusterService(clusterService);
jobRunner.loadJobResource(null, clusterService, null, null);

exception =
assertThrows(
Expand All @@ -142,7 +180,7 @@ public void testRunJobWithUninitializedServices() {
"Expected IllegalStateException but no exception was thrown");
assertEquals("ThreadPool is not initialized.", exception.getMessage());

jobRunner.setThreadPool(threadPool);
jobRunner.loadJobResource(null, clusterService, threadPool, null);

exception =
assertThrows(
Expand All @@ -151,7 +189,7 @@ public void testRunJobWithUninitializedServices() {
"Expected IllegalStateException but no exception was thrown");
assertEquals("Client is not initialized.", exception.getMessage());

jobRunner.setClient(client);
jobRunner.loadJobResource(client, clusterService, threadPool, null);

exception =
assertThrows(
Expand All @@ -163,9 +201,9 @@ public void testRunJobWithUninitializedServices() {

@Test
public void testGetJobRunnerInstanceMultipleCalls() {
ScheduledAsyncQueryJob instance1 = ScheduledAsyncQueryJob.getJobRunnerInstance();
ScheduledAsyncQueryJob instance2 = ScheduledAsyncQueryJob.getJobRunnerInstance();
ScheduledAsyncQueryJob instance3 = ScheduledAsyncQueryJob.getJobRunnerInstance();
ScheduledAsyncQueryJobRunner instance1 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
ScheduledAsyncQueryJobRunner instance2 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
ScheduledAsyncQueryJobRunner instance3 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance();

assertSame(instance1, instance2);
assertSame(instance2, instance3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testFromAsyncQuerySchedulerRequestWithInvalidSchedule() {
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(request);
});

assertEquals("Schedule must be a String object.", exception.getMessage());
assertEquals("Schedule must be a String object for parsing.", exception.getMessage());
}

@Test
Expand All @@ -162,7 +162,6 @@ public void testEqualsAndHashCode() {

// Test toString
String toString = request1.toString();
System.out.println(toString);
assertTrue(toString.contains("accountId=testAccount"));
assertTrue(toString.contains("jobId=testJob"));
assertTrue(toString.contains("dataSource=testDataSource"));
Expand Down
Loading

0 comments on commit d2c316b

Please sign in to comment.