From d7e0c9526b8706406d203b42fdd0fd94cfb13edf Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 17 Jul 2024 09:52:43 -0700 Subject: [PATCH] Add unit test Signed-off-by: Louis Chu --- .../src/main/antlr/SqlBaseParser.g4 | 2 +- async-query/build.gradle | 1 + .../OpenSearchAsyncQueryScheduler.java | 2 +- .../OpenSearchAsyncQuerySchedulerTest.java | 233 ++++++++++++++++++ .../job/OpenSearchRefreshIndexJobTest.java | 183 ++++++++++++++ .../OpenSearchRefreshIndexJobRequestTest.java | 78 ++++++ 6 files changed, 497 insertions(+), 2 deletions(-) create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index eac017cc5e..dc2c2f0794 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -52,7 +52,7 @@ singleCompoundStatement ; beginEndCompoundBlock - : BEGIN compoundBody END + : beginLabel? BEGIN compoundBody END endLabel? ; compoundBody diff --git a/async-query/build.gradle b/async-query/build.gradle index f4907b91c0..103f24f458 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -17,6 +17,7 @@ repositories { dependencies { compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + testCompileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" api project(':core') api project(':async-query-core') diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 992d9979d7..e8ad07dc40 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -149,7 +149,7 @@ public void removeJob(String jobId) { } /** Creates the async query scheduler index with specified mappings and settings. */ - private void createAsyncQuerySchedulerIndex() { + void createAsyncQuerySchedulerIndex() { try { InputStream mappingFileStream = OpenSearchAsyncQueryScheduler.class diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java new file mode 100644 index 0000000000..63928cd1db --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -0,0 +1,233 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; + +public class OpenSearchAsyncQuerySchedulerTest { + + @Mock private Client client; + + @Mock private ClusterService clusterService; + + @Mock private ActionFuture indexResponseActionFuture; + + @Mock private ActionFuture updateResponseActionFuture; + + @Mock private ActionFuture deleteResponseActionFuture; + + @Mock private ActionFuture createIndexResponseActionFuture; + + private OpenSearchAsyncQueryScheduler scheduler; + + @BeforeEach + public void setup() { + MockitoAnnotations.initMocks(this); + scheduler = new OpenSearchAsyncQueryScheduler(); + scheduler.loadJobResource(client, clusterService, null); + } + + @Test + public void testScheduleJob() throws IOException { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + IndexResponse indexResponse = mock(IndexResponse.class); + when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); + when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); + + when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); + + scheduler.scheduleJob(request); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client).index(captor.capture()); + + IndexRequest capturedRequest = captor.getValue(); + assertEquals(request.getName(), capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testScheduleJobWithExistingJob() throws IOException { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + when(client.index(any(IndexRequest.class))).thenThrow(VersionConflictEngineException.class); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.scheduleJob(request); + }); + + assertEquals("A job already exists with name: testJob", exception.getMessage()); + } + + @Test + public void testUnscheduleJob() throws IOException { + String jobId = "testJob"; + + when(clusterService + .state() + .routingTable() + .hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME)) + .thenReturn(true); + + UpdateResponse updateResponse = mock(UpdateResponse.class); + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + + scheduler.unscheduleJob(jobId); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).update(captor.capture()); + + UpdateRequest capturedRequest = captor.getValue(); + assertEquals(jobId, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testUpdateJob() throws IOException { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService + .state() + .routingTable() + .hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME)) + .thenReturn(true); + + UpdateResponse updateResponse = mock(UpdateResponse.class); + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + + scheduler.updateJob(request); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).update(captor.capture()); + + UpdateRequest capturedRequest = captor.getValue(); + assertEquals(request.getName(), capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testRemoveJob() { + String jobId = "testJob"; + + when(clusterService + .state() + .routingTable() + .hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME)) + .thenReturn(true); + + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); + + when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + + scheduler.removeJob(jobId); + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(captor.capture()); + + DeleteRequest capturedRequest = captor.getValue(); + assertEquals(jobId, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testCreateAsyncQuerySchedulerIndex() throws IOException { + when(clusterService + .state() + .routingTable() + .hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME)) + .thenReturn(false); + + CreateIndexResponse createIndexResponse = mock(CreateIndexResponse.class); + when(createIndexResponseActionFuture.actionGet()).thenReturn(createIndexResponse); + when(createIndexResponse.isAcknowledged()).thenReturn(true); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + + scheduler.createAsyncQuerySchedulerIndex(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); + verify(client.admin().indices()).create(captor.capture()); + + CreateIndexRequest capturedRequest = captor.getValue(); + assertEquals(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, capturedRequest.index()); + } + + @Test + public void testCreateAsyncQuerySchedulerIndexFailure() throws IOException { + when(clusterService + .state() + .routingTable() + .hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME)) + .thenReturn(false); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenThrow(new RuntimeException("Error creating index")); + + RuntimeException exception = + assertThrows( + RuntimeException.class, + () -> { + scheduler.createAsyncQuerySchedulerIndex(); + }); + + assertEquals( + "Internal server error while creating .async-query-scheduler index: Error creating index", + exception.getMessage()); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java new file mode 100644 index 0000000000..79f883e099 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java @@ -0,0 +1,183 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class OpenSearchRefreshIndexJobTest { + + @Mock private ClusterService clusterService; + + @Mock private ThreadPool threadPool; + + @Mock private Client client; + + @Mock private JobExecutionContext context; + + @Mock private LockService lockService; + + private OpenSearchRefreshIndexJob jobRunner; + + @BeforeEach + public void setup() { + MockitoAnnotations.initMocks(this); + jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + when(context.getLockService()).thenReturn(lockService); + } + + @Test + public void testRunJobWithCorrectParameter() { + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + when(jobParameter.getLockDurationSeconds()).thenReturn(10L); + + jobRunner.runJob(jobParameter, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.generic()).submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(lockService).acquireLock(eq(jobParameter), eq(context), any(ActionListener.class)); + } + + @Test + public void testRunJobWithIncorrectParameter() { + ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); + + try { + jobRunner.runJob(wrongParameter, context); + } catch (IllegalStateException e) { + assertEquals( + "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + + wrongParameter.getClass().getCanonicalName(), + e.getMessage()); + } + } + + @Test + public void testRunJobWithUninitializedServices() { + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + OpenSearchRefreshIndexJob uninitializedJobRunner = + OpenSearchRefreshIndexJob.getJobRunnerInstance(); + + try { + uninitializedJobRunner.runJob(jobParameter, context); + } catch (IllegalStateException e) { + assertEquals("ClusterService is not initialized.", e.getMessage()); + } + + uninitializedJobRunner.setClusterService(clusterService); + + try { + uninitializedJobRunner.runJob(jobParameter, context); + } catch (IllegalStateException e) { + assertEquals("ThreadPool is not initialized.", e.getMessage()); + } + } + + @Test + public void testAcquireLockFailure() { + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .build(); + + doAnswer( + invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onFailure(new RuntimeException("Failed to acquire lock")); + return null; + }) + .when(lockService) + .acquireLock(eq(jobParameter), eq(context), any(ActionListener.class)); + + jobRunner.runJob(jobParameter, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.generic()).submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(lockService).acquireLock(eq(jobParameter), eq(context), any(ActionListener.class)); + } + + @Test + public void testReleaseLockFailure() { + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .build(); + + doAnswer( + invocation -> { + ActionListener acquireListener = invocation.getArgument(2); + acquireListener.onResponse(null); + return null; + }) + .when(lockService) + .acquireLock(eq(jobParameter), eq(context), any(ActionListener.class)); + + doAnswer( + invocation -> { + ActionListener releaseListener = invocation.getArgument(1); + releaseListener.onFailure(new RuntimeException("Failed to release lock")); + return null; + }) + .when(lockService) + .release(any(), any(ActionListener.class)); + + jobRunner.runJob(jobParameter, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.generic()).submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(lockService).acquireLock(eq(jobParameter), eq(context), any(ActionListener.class)); + verify(lockService).release(any(), any(ActionListener.class)); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java new file mode 100644 index 0000000000..d8d26d0fe9 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; + +public class OpenSearchRefreshIndexJobRequestTest { + + @Test + public void testBuilderAndGetterMethods() { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + OpenSearchRefreshIndexJobRequest jobRequest = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .jobType("testType") + .schedule(schedule) + .enabled(true) + .lastUpdateTime(now) + .enabledTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + assertEquals("testJob", jobRequest.getName()); + assertEquals("testType", jobRequest.getJobType()); + assertEquals(schedule, jobRequest.getSchedule()); + assertTrue(jobRequest.isEnabled()); + assertEquals(now, jobRequest.getLastUpdateTime()); + assertEquals(now, jobRequest.getEnabledTime()); + assertEquals(60L, jobRequest.getLockDurationSeconds()); + assertEquals(0.1, jobRequest.getJitter()); + } + + @Test + public void testToXContent() throws IOException { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + OpenSearchRefreshIndexJobRequest jobRequest = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .jobType("testType") + .schedule(schedule) + .enabled(true) + .lastUpdateTime(now) + .enabledTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + jobRequest.toXContent(builder, EMPTY_PARAMS); + String jsonString = builder.toString(); + + assertTrue(jsonString.contains("\"jobName\":\"testJob\"")); + assertTrue(jsonString.contains("\"jobType\":\"testType\"")); + assertTrue(jsonString.contains("\"enabled\":true")); + assertTrue(jsonString.contains("\"lastUpdateTime\":" + now.toEpochMilli())); + assertTrue(jsonString.contains("\"enabledTime\":" + now.toEpochMilli())); + assertTrue(jsonString.contains("\"lockDurationSeconds\":60")); + assertTrue(jsonString.contains("\"jitter\":0.1")); + } +}