Skip to content

Commit

Permalink
Update UT
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 22, 2024
1 parent 6f85b9a commit 519c79d
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ public static ScheduledJobParser getJobParser() {
};
}

private static Instant parseInstantValue(XContentParser parser) throws IOException {
static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
return null; // This line will never be reached due to the exception thrown above.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,10 @@ public class OpenSearchRefreshIndexJob implements ScheduledJobRunner {

private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class);

private static OpenSearchRefreshIndexJob INSTANCE;
public static OpenSearchRefreshIndexJob INSTANCE = new OpenSearchRefreshIndexJob();

public static OpenSearchRefreshIndexJob getJobRunnerInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (OpenSearchRefreshIndexJob.class) {
if (INSTANCE == null) {
INSTANCE = new OpenSearchRefreshIndexJob();
}
return INSTANCE;
}
return INSTANCE;
}

private ClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
package org.opensearch.sql.spark.scheduler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME;

import java.io.IOException;
import java.time.Instant;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
Expand All @@ -36,8 +39,12 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -86,6 +93,8 @@ public void testScheduleJob() {
when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME));
when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture);

// Test the if case
when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse);
when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED);

Expand All @@ -98,11 +107,11 @@ public void testScheduleJob() {
scheduler.scheduleJob(request);

// Verify index created
verify(client.admin().indices(), Mockito.times(1)).create(ArgumentMatchers.any());
verify(client.admin().indices(), times(1)).create(ArgumentMatchers.any());

// Verify doc indexed
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, Mockito.times(1)).index(captor.capture());
verify(client, times(1)).index(captor.capture());
IndexRequest capturedRequest = captor.getValue();
assertEquals(request.getName(), capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
Expand All @@ -128,12 +137,12 @@ public void testScheduleJobWithExistingJob() {
scheduler.scheduleJob(request);
});

verify(client, Mockito.times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture());
verify(client, times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture());
assertEquals("A job already exists with name: testJob", exception.getMessage());
}

@Test
public void testScheduleJobWithException() {
public void testScheduleJobWithExceptions() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME))
.thenReturn(Boolean.FALSE);
when(client.admin().indices().create(any(CreateIndexRequest.class)))
Expand All @@ -149,6 +158,14 @@ public void testScheduleJobWithException() {
.build();

assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request));

when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture);
when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse);
when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND);

RuntimeException exception =
assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request));
assertEquals("Schedule job failed with result : not_found", exception.getMessage());
}

@Test
Expand All @@ -168,6 +185,17 @@ public void testUnscheduleJob() throws IOException {
UpdateRequest capturedRequest = captor.getValue();
assertEquals(TEST_JOB_ID, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());

// Reset the captor for the next verification
captor = ArgumentCaptor.forClass(UpdateRequest.class);

when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP);
scheduler.unscheduleJob(TEST_JOB_ID);

verify(client, times(2)).update(captor.capture());
capturedRequest = captor.getValue();
assertEquals(TEST_JOB_ID, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
Expand Down Expand Up @@ -215,6 +243,47 @@ public void testUpdateJobWithIndexNotFound() {
assertThrows(IllegalArgumentException.class, () -> scheduler.updateJob(request));
}

@Test
public void testUpdateJobWithExceptions() {
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();

when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);
when(client.update(any(UpdateRequest.class)))
.thenThrow(new DocumentMissingException(null, null));

IllegalArgumentException exception1 =
assertThrows(
IllegalArgumentException.class,
() -> {
scheduler.updateJob(request);
});

assertEquals("Job: testJob doesn't exist", exception1.getMessage());

when(client.update(any(UpdateRequest.class))).thenThrow(new RuntimeException("Test exception"));

RuntimeException exception2 =
assertThrows(
RuntimeException.class,
() -> {
scheduler.updateJob(request);
});

assertEquals("java.lang.RuntimeException: Test exception", exception2.getMessage());

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);
when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse);
when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND);

RuntimeException exception =
assertThrows(RuntimeException.class, () -> scheduler.updateJob(request));
assertEquals("Update job failed with result : not_found", exception.getMessage());
}

@Test
public void testRemoveJob() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);
Expand Down Expand Up @@ -279,6 +348,24 @@ public void testCreateAsyncQuerySchedulerIndexFailure() {
assertEquals(
"Internal server error while creating .async-query-scheduler index: Error creating index",
exception.getMessage());

when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenReturn(createIndexResponseActionFuture);
Mockito.when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME));

OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();

RuntimeException runtimeException =
Assertions.assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request));
Assertions.assertEquals(
"Internal server error while creating .async-query-scheduler index: Index creation is not"
+ " acknowledged.",
runtimeException.getMessage());
}

@Test
Expand Down Expand Up @@ -325,11 +412,63 @@ public void testRemoveJobNotFound() {
}

@Test
public void testRemoveJobWithException() {
public void testRemoveJobWithExceptions() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception"));

assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID));

DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture);
when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP);

RuntimeException runtimeException =
Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID));
Assertions.assertEquals("Remove job failed with result : noop", runtimeException.getMessage());
}

@Test
public void testGetJobRunner() {
ScheduledJobRunner jobRunner = OpenSearchAsyncQueryScheduler.getJobRunner();
assertNotNull(jobRunner);
}

@Test
public void testGetJobParser() {
ScheduledJobParser jobParser = OpenSearchAsyncQueryScheduler.getJobParser();
assertNotNull(jobParser);
}

@Test
public void testParseInstantValueNull() throws IOException {
XContentParser parser = mock(XContentParser.class);
when(parser.currentToken()).thenReturn(XContentParser.Token.VALUE_NULL);

Instant instant = OpenSearchAsyncQueryScheduler.parseInstantValue(parser);
assertEquals(null, instant);
}

@Test
public void testParseInstantValueLong() throws IOException {
XContentParser parser = mock(XContentParser.class);
long epochMillis = 1624892400000L; // 2021-06-28T12:00:00Z
when(parser.currentToken()).thenReturn(XContentParser.Token.VALUE_NUMBER);
when(parser.longValue()).thenReturn(epochMillis);

Instant instant = OpenSearchAsyncQueryScheduler.parseInstantValue(parser);
assertEquals(Instant.ofEpochMilli(epochMillis), instant);
}

@Test
public void testParseInstantValueInvalidToken() {
XContentParser parser = mock(XContentParser.class);
when(parser.currentToken()).thenReturn(XContentParser.Token.START_OBJECT);

ParsingException exception =
assertThrows(
ParsingException.class, () -> OpenSearchAsyncQueryScheduler.parseInstantValue(parser));
assertNotNull(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
package org.opensearch.sql.spark.scheduler.job;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
Expand Down Expand Up @@ -47,14 +53,36 @@ public class OpenSearchRefreshIndexJobTest {
public void setup() {
MockitoAnnotations.openMocks(this);
jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance();
spyJobRunner = spy(jobRunner);
spyJobRunner.setClusterService(null);
spyJobRunner.setThreadPool(null);
spyJobRunner.setClient(null);
jobRunner.setClient(null);
jobRunner.setClusterService(null);
jobRunner.setThreadPool(null);
}

@Test
public void testGetJobRunnerInstanceCalledConcurrently() throws InterruptedException {
int threadCount = 3;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
executorService.execute(
() -> {
try {
OpenSearchRefreshIndexJob instance = OpenSearchRefreshIndexJob.getJobRunnerInstance();
assertNotNull(instance);
} finally {
latch.countDown();
}
});
}

latch.await(5, TimeUnit.SECONDS);
executorService.shutdown();
}

@Test
public void testRunJobWithCorrectParameter() {
spyJobRunner = spy(jobRunner);
spyJobRunner.setClusterService(clusterService);
spyJobRunner.setThreadPool(threadPool);
spyJobRunner.setClient(client);
Expand All @@ -79,9 +107,10 @@ public void testRunJobWithCorrectParameter() {

@Test
public void testRunJobWithIncorrectParameter() {
spyJobRunner.setClusterService(clusterService);
spyJobRunner.setThreadPool(threadPool);
spyJobRunner.setClient(client);
jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance();
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);

ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class);

Expand Down Expand Up @@ -130,4 +159,14 @@ public void testRunJobWithUninitializedServices() {
"Expected IllegalStateException but no exception was thrown");
assertEquals("Client is not initialized.", exception.getMessage());
}

@Test
public void testGetJobRunnerInstanceMultipleCalls() {
OpenSearchRefreshIndexJob instance1 = OpenSearchRefreshIndexJob.getJobRunnerInstance();
OpenSearchRefreshIndexJob instance2 = OpenSearchRefreshIndexJob.getJobRunnerInstance();
OpenSearchRefreshIndexJob instance3 = OpenSearchRefreshIndexJob.getJobRunnerInstance();

assertSame(instance1, instance2);
assertSame(instance2, instance3);
}
}

0 comments on commit 519c79d

Please sign in to comment.