From 519c79ddcdca4f69e7deb7546c5cbdb81e4a9e3f Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 22 Jul 2024 10:31:20 -0700 Subject: [PATCH] Update UT Signed-off-by: Louis Chu --- .../OpenSearchAsyncQueryScheduler.java | 4 +- .../job/OpenSearchRefreshIndexJob.java | 12 +- .../OpenSearchAsyncQuerySchedulerTest.java | 149 +++++++++++++++++- .../job/OpenSearchRefreshIndexJobTest.java | 53 ++++++- 4 files changed, 194 insertions(+), 24 deletions(-) diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index c2bbae8d36..1459404eb4 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -239,7 +239,7 @@ public static ScheduledJobParser getJobParser() { }; } - private static Instant parseInstantValue(XContentParser parser) throws IOException { + static Instant parseInstantValue(XContentParser parser) throws IOException { if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { return null; } @@ -247,6 +247,6 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti return Instant.ofEpochMilli(parser.longValue()); } XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); - return null; + return null; // This line will never be reached due to the exception thrown above. } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java index 2e2f27989b..d15757777d 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java @@ -33,18 +33,10 @@ public class OpenSearchRefreshIndexJob implements ScheduledJobRunner { private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class); - private static OpenSearchRefreshIndexJob INSTANCE; + public static OpenSearchRefreshIndexJob INSTANCE = new OpenSearchRefreshIndexJob(); public static OpenSearchRefreshIndexJob getJobRunnerInstance() { - if (INSTANCE != null) { - return INSTANCE; - } - synchronized (OpenSearchRefreshIndexJob.class) { - if (INSTANCE == null) { - INSTANCE = new OpenSearchRefreshIndexJob(); - } - return INSTANCE; - } + return INSTANCE; } private ClusterService clusterService; diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java index 7e86352091..0346819dd4 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -6,15 +6,18 @@ package org.opensearch.sql.spark.scheduler; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; import java.io.IOException; import java.time.Instant; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Answers; @@ -36,8 +39,12 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; +import org.opensearch.core.common.ParsingException; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; import org.opensearch.threadpool.ThreadPool; @@ -86,6 +93,8 @@ public void testScheduleJob() { when(createIndexResponseActionFuture.actionGet()) .thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME)); when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); + + // Test the if case when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); @@ -98,11 +107,11 @@ public void testScheduleJob() { scheduler.scheduleJob(request); // Verify index created - verify(client.admin().indices(), Mockito.times(1)).create(ArgumentMatchers.any()); + verify(client.admin().indices(), times(1)).create(ArgumentMatchers.any()); // Verify doc indexed ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); - verify(client, Mockito.times(1)).index(captor.capture()); + verify(client, times(1)).index(captor.capture()); IndexRequest capturedRequest = captor.getValue(); assertEquals(request.getName(), capturedRequest.id()); assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); @@ -128,12 +137,12 @@ public void testScheduleJobWithExistingJob() { scheduler.scheduleJob(request); }); - verify(client, Mockito.times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture()); + verify(client, times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture()); assertEquals("A job already exists with name: testJob", exception.getMessage()); } @Test - public void testScheduleJobWithException() { + public void testScheduleJobWithExceptions() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) .thenReturn(Boolean.FALSE); when(client.admin().indices().create(any(CreateIndexRequest.class))) @@ -149,6 +158,14 @@ public void testScheduleJobWithException() { .build(); assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + + when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); + when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); + when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + assertEquals("Schedule job failed with result : not_found", exception.getMessage()); } @Test @@ -168,6 +185,17 @@ public void testUnscheduleJob() throws IOException { UpdateRequest capturedRequest = captor.getValue(); assertEquals(TEST_JOB_ID, capturedRequest.id()); assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + + // Reset the captor for the next verification + captor = ArgumentCaptor.forClass(UpdateRequest.class); + + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); + scheduler.unscheduleJob(TEST_JOB_ID); + + verify(client, times(2)).update(captor.capture()); + capturedRequest = captor.getValue(); + assertEquals(TEST_JOB_ID, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); } @Test @@ -215,6 +243,47 @@ public void testUpdateJobWithIndexNotFound() { assertThrows(IllegalArgumentException.class, () -> scheduler.updateJob(request)); } + @Test + public void testUpdateJobWithExceptions() { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + when(client.update(any(UpdateRequest.class))) + .thenThrow(new DocumentMissingException(null, null)); + + IllegalArgumentException exception1 = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.updateJob(request); + }); + + assertEquals("Job: testJob doesn't exist", exception1.getMessage()); + + when(client.update(any(UpdateRequest.class))).thenThrow(new RuntimeException("Test exception")); + + RuntimeException exception2 = + assertThrows( + RuntimeException.class, + () -> { + scheduler.updateJob(request); + }); + + assertEquals("java.lang.RuntimeException: Test exception", exception2.getMessage()); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> scheduler.updateJob(request)); + assertEquals("Update job failed with result : not_found", exception.getMessage()); + } + @Test public void testRemoveJob() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); @@ -279,6 +348,24 @@ public void testCreateAsyncQuerySchedulerIndexFailure() { assertEquals( "Internal server error while creating .async-query-scheduler index: Error creating index", exception.getMessage()); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + Mockito.when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME)); + + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + RuntimeException runtimeException = + Assertions.assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + Assertions.assertEquals( + "Internal server error while creating .async-query-scheduler index: Index creation is not" + + " acknowledged.", + runtimeException.getMessage()); } @Test @@ -325,11 +412,63 @@ public void testRemoveJobNotFound() { } @Test - public void testRemoveJobWithException() { + public void testRemoveJobWithExceptions() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception")); assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); + + RuntimeException runtimeException = + Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + Assertions.assertEquals("Remove job failed with result : noop", runtimeException.getMessage()); + } + + @Test + public void testGetJobRunner() { + ScheduledJobRunner jobRunner = OpenSearchAsyncQueryScheduler.getJobRunner(); + assertNotNull(jobRunner); + } + + @Test + public void testGetJobParser() { + ScheduledJobParser jobParser = OpenSearchAsyncQueryScheduler.getJobParser(); + assertNotNull(jobParser); + } + + @Test + public void testParseInstantValueNull() throws IOException { + XContentParser parser = mock(XContentParser.class); + when(parser.currentToken()).thenReturn(XContentParser.Token.VALUE_NULL); + + Instant instant = OpenSearchAsyncQueryScheduler.parseInstantValue(parser); + assertEquals(null, instant); + } + + @Test + public void testParseInstantValueLong() throws IOException { + XContentParser parser = mock(XContentParser.class); + long epochMillis = 1624892400000L; // 2021-06-28T12:00:00Z + when(parser.currentToken()).thenReturn(XContentParser.Token.VALUE_NUMBER); + when(parser.longValue()).thenReturn(epochMillis); + + Instant instant = OpenSearchAsyncQueryScheduler.parseInstantValue(parser); + assertEquals(Instant.ofEpochMilli(epochMillis), instant); + } + + @Test + public void testParseInstantValueInvalidToken() { + XContentParser parser = mock(XContentParser.class); + when(parser.currentToken()).thenReturn(XContentParser.Token.START_OBJECT); + + ParsingException exception = + assertThrows( + ParsingException.class, () -> OpenSearchAsyncQueryScheduler.parseInstantValue(parser)); + assertNotNull(exception); } } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java index 072184821c..d5d432f3db 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java @@ -6,6 +6,8 @@ package org.opensearch.sql.spark.scheduler.job; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -13,6 +15,10 @@ import static org.mockito.Mockito.verify; import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Answers; @@ -47,14 +53,36 @@ public class OpenSearchRefreshIndexJobTest { public void setup() { MockitoAnnotations.openMocks(this); jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - spyJobRunner = spy(jobRunner); - spyJobRunner.setClusterService(null); - spyJobRunner.setThreadPool(null); - spyJobRunner.setClient(null); + jobRunner.setClient(null); + jobRunner.setClusterService(null); + jobRunner.setThreadPool(null); + } + + @Test + public void testGetJobRunnerInstanceCalledConcurrently() throws InterruptedException { + int threadCount = 3; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.execute( + () -> { + try { + OpenSearchRefreshIndexJob instance = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + assertNotNull(instance); + } finally { + latch.countDown(); + } + }); + } + + latch.await(5, TimeUnit.SECONDS); + executorService.shutdown(); } @Test public void testRunJobWithCorrectParameter() { + spyJobRunner = spy(jobRunner); spyJobRunner.setClusterService(clusterService); spyJobRunner.setThreadPool(threadPool); spyJobRunner.setClient(client); @@ -79,9 +107,10 @@ public void testRunJobWithCorrectParameter() { @Test public void testRunJobWithIncorrectParameter() { - spyJobRunner.setClusterService(clusterService); - spyJobRunner.setThreadPool(threadPool); - spyJobRunner.setClient(client); + jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); @@ -130,4 +159,14 @@ public void testRunJobWithUninitializedServices() { "Expected IllegalStateException but no exception was thrown"); assertEquals("Client is not initialized.", exception.getMessage()); } + + @Test + public void testGetJobRunnerInstanceMultipleCalls() { + OpenSearchRefreshIndexJob instance1 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + OpenSearchRefreshIndexJob instance2 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + OpenSearchRefreshIndexJob instance3 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + + assertSame(instance1, instance2); + assertSame(instance2, instance3); + } }