From 7dac5e62d923652334df184a3a98214925201b33 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 12 Sep 2024 13:48:13 -0700 Subject: [PATCH] In progress --- .../src/main/antlr/SqlBaseLexer.g4 | 5 ++ .../src/main/antlr/SqlBaseParser.g4 | 25 ++++++++ .../flint/operation/FlintIndexOpAlter.java | 7 ++- .../flint/operation/FlintIndexOpDrop.java | 7 ++- .../spark/scheduler/AsyncQueryScheduler.java | 12 ++-- .../OpenSearchAsyncQueryScheduler.java | 27 +++++---- .../OpenSearchAsyncQuerySchedulerTest.java | 58 +++++++++++++++---- 7 files changed, 113 insertions(+), 28 deletions(-) diff --git a/async-query-core/src/main/antlr/SqlBaseLexer.g4 b/async-query-core/src/main/antlr/SqlBaseLexer.g4 index acfc0011f5..9ea213f3bf 100644 --- a/async-query-core/src/main/antlr/SqlBaseLexer.g4 +++ b/async-query-core/src/main/antlr/SqlBaseLexer.g4 @@ -162,6 +162,7 @@ CLUSTERED: 'CLUSTERED'; CODEGEN: 'CODEGEN'; COLLATE: 'COLLATE'; COLLATION: 'COLLATION'; +COLLATIONS: 'COLLATIONS'; COLLECTION: 'COLLECTION'; COLUMN: 'COLUMN'; COLUMNS: 'COLUMNS'; @@ -276,6 +277,7 @@ INTO: 'INTO'; INVOKER: 'INVOKER'; IS: 'IS'; ITEMS: 'ITEMS'; +ITERATE: 'ITERATE'; JOIN: 'JOIN'; KEYS: 'KEYS'; LANGUAGE: 'LANGUAGE'; @@ -283,6 +285,7 @@ LAST: 'LAST'; LATERAL: 'LATERAL'; LAZY: 'LAZY'; LEADING: 'LEADING'; +LEAVE: 'LEAVE'; LEFT: 'LEFT'; LIKE: 'LIKE'; ILIKE: 'ILIKE'; @@ -362,6 +365,7 @@ REFERENCES: 'REFERENCES'; REFRESH: 'REFRESH'; RENAME: 'RENAME'; REPAIR: 'REPAIR'; +REPEAT: 'REPEAT'; REPEATABLE: 'REPEATABLE'; REPLACE: 'REPLACE'; RESET: 'RESET'; @@ -451,6 +455,7 @@ UNKNOWN: 'UNKNOWN'; UNLOCK: 'UNLOCK'; UNPIVOT: 'UNPIVOT'; UNSET: 'UNSET'; +UNTIL: 'UNTIL'; UPDATE: 'UPDATE'; USE: 'USE'; USER: 'USER'; diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index 5b8805821b..42f0094de3 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -65,6 +65,9 @@ compoundStatement | beginEndCompoundBlock | ifElseStatement | whileStatement + | repeatStatement + | leaveStatement + | iterateStatement ; setStatementWithOptionalVarKeyword @@ -83,6 +86,18 @@ ifElseStatement (ELSE elseBody=compoundBody)? END IF ; +repeatStatement + : beginLabel? REPEAT compoundBody UNTIL booleanExpression END REPEAT endLabel? + ; + +leaveStatement + : LEAVE multipartIdentifier + ; + +iterateStatement + : ITERATE multipartIdentifier + ; + singleStatement : (statement|setResetStatement) SEMICOLON* EOF ; @@ -245,6 +260,7 @@ statement | SHOW PARTITIONS identifierReference partitionSpec? #showPartitions | SHOW identifier? FUNCTIONS ((FROM | IN) ns=identifierReference)? (LIKE? (legacy=multipartIdentifier | pattern=stringLit))? #showFunctions + | SHOW COLLATIONS (LIKE? pattern=stringLit)? #showCollations | SHOW CREATE TABLE identifierReference (AS SERDE)? #showCreateTable | SHOW CURRENT namespace #showCurrentNamespace | SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs @@ -1578,10 +1594,12 @@ ansiNonReserved | INTERVAL | INVOKER | ITEMS + | ITERATE | KEYS | LANGUAGE | LAST | LAZY + | LEAVE | LIKE | ILIKE | LIMIT @@ -1648,6 +1666,7 @@ ansiNonReserved | REFRESH | RENAME | REPAIR + | REPEAT | REPEATABLE | REPLACE | RESET @@ -1723,6 +1742,7 @@ ansiNonReserved | UNLOCK | UNPIVOT | UNSET + | UNTIL | UPDATE | USE | VALUES @@ -1818,6 +1838,7 @@ nonReserved | CODEGEN | COLLATE | COLLATION + | COLLATIONS | COLLECTION | COLUMN | COLUMNS @@ -1927,11 +1948,13 @@ nonReserved | INVOKER | IS | ITEMS + | ITERATE | KEYS | LANGUAGE | LAST | LAZY | LEADING + | LEAVE | LIKE | LONG | ILIKE @@ -2009,6 +2032,7 @@ nonReserved | REFRESH | RENAME | REPAIR + | REPEAT | REPEATABLE | REPLACE | RESET @@ -2093,6 +2117,7 @@ nonReserved | UNLOCK | UNPIVOT | UNSET + | UNTIL | UPDATE | USE | USER diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index de34803823..7a743f94b6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -17,6 +17,7 @@ import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; /** * Index Operation for Altering the flint index. Only handles alter operation when @@ -62,7 +63,11 @@ void runOp( this.flintIndexMetadataService.updateIndexToManualRefresh( flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext); if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { - asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setAccountId(flintIndexStateModel.getAccountId()); + request.setDataSource(flintIndexStateModel.getDatasourceName()); + request.setJobId(flintIndexMetadata.getOpensearchIndexName()); + asyncQueryScheduler.unscheduleJob(request); } else { cancelStreamingJob(flintIndexStateModel); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index 3fa5423c10..6f51c21437 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -15,6 +15,7 @@ import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; /** Operation to drop Flint index */ public class FlintIndexOpDrop extends FlintIndexOp { @@ -54,7 +55,11 @@ void runOp( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { - asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setAccountId(flintIndexStateModel.getAccountId()); + request.setDataSource(flintIndexStateModel.getDatasourceName()); + request.setJobId(flintIndexMetadata.getOpensearchIndexName()); + asyncQueryScheduler.unscheduleJob(request); } else { cancelStreamingJob(flintIndexStateModel); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java index 8ac499081e..cb93d68ed1 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java @@ -40,9 +40,11 @@ public interface AsyncQueryScheduler { * Temporarily disabling a job during maintenance or high-load periods - Allowing for easy * re-enabling of the job in the future * - * @param jobId The unique identifier of the job to unschedule + * @param asyncQuerySchedulerRequest The request containing updated job configuration + * @throws IllegalArgumentException if the job to be unscheduled doesn't exist + * @throws RuntimeException if there's an error during the unschedule process */ - void unscheduleJob(String jobId); + void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); /** * Removes a job completely from the scheduler. This method permanently deletes the job and all @@ -51,7 +53,9 @@ public interface AsyncQueryScheduler { *

Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously * created jobs - Freeing up resources by deleting unused job configurations * - * @param jobId The unique identifier of the job to remove + * @param asyncQuerySchedulerRequest The request to delete the job configuration + * @throws IllegalArgumentException if the job to be removed doesn't exist + * @throws RuntimeException if there's an error during the remove process */ - void removeJob(String jobId); + void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 9ebde4fe83..7f49a50877 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -8,6 +8,7 @@ import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -87,18 +88,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { /** Unschedules a job by marking it as disabled and updating its last update time. */ @Override - public void unscheduleJob(String jobId) { - ScheduledAsyncQueryJobRequest request = - ScheduledAsyncQueryJobRequest.builder() - .jobId(jobId) - .enabled(false) - .lastUpdateTime(Instant.now()) - .build(); + public void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { + String jobId = asyncQuerySchedulerRequest.getJobId(); + if (Strings.isNullOrEmpty(jobId)) { + throw new IllegalArgumentException("JobId cannot be null or empty"); + } try { - updateJob(request); - LOG.info("Unscheduled job for jobId: {}", jobId); + asyncQuerySchedulerRequest.setEnabled(false); + asyncQuerySchedulerRequest.setLastUpdateTime(Instant.now()); + updateJob(asyncQuerySchedulerRequest); + LOG.info("Unscheduled job for jobId: {}", asyncQuerySchedulerRequest); } catch (IllegalStateException | DocumentMissingException e) { - LOG.error("Failed to unschedule job: {}", jobId, e); + LOG.error("Failed to unschedule job: {}", asyncQuerySchedulerRequest, e); } } @@ -134,8 +135,12 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { /** Removes a job by deleting its document from the index. */ @Override - public void removeJob(String jobId) { + public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { assertIndexExists(); + String jobId = asyncQuerySchedulerRequest.getJobId(); + if (Strings.isNullOrEmpty(jobId)) { + throw new IllegalArgumentException("JobId cannot be null or empty"); + } DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); ActionFuture deleteResponseActionFuture = client.delete(deleteRequest); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java index a4a6eb6471..c938610fdb 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -16,7 +16,6 @@ import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; -import java.io.IOException; import java.time.Instant; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -43,6 +42,7 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; public class OpenSearchAsyncQuerySchedulerTest { @@ -162,7 +162,7 @@ public void testScheduleJobWithExceptions() { } @Test - public void testUnscheduleJob() throws IOException { + public void testUnscheduleJob() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); @@ -170,7 +170,9 @@ public void testUnscheduleJob() throws IOException { when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); - scheduler.unscheduleJob(TEST_JOB_ID); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); + scheduler.unscheduleJob(request); ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); verify(client).update(captor.capture()); @@ -183,7 +185,7 @@ public void testUnscheduleJob() throws IOException { captor = ArgumentCaptor.forClass(UpdateRequest.class); when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); - scheduler.unscheduleJob(TEST_JOB_ID); + scheduler.unscheduleJob(request); verify(client, times(2)).update(captor.capture()); capturedRequest = captor.getValue(); @@ -191,18 +193,32 @@ public void testUnscheduleJob() throws IOException { assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); } + @Test + public void testUnscheduleJobInvalidJobId() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(null); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> scheduler.unscheduleJob(request)); + assertEquals("JobId cannot be null or empty", exception.getMessage()); + } + @Test public void testUnscheduleJobWithIndexNotFound() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - scheduler.unscheduleJob(TEST_JOB_ID); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); + scheduler.unscheduleJob(request); // Verify that no update operation was performed verify(client, never()).update(any(UpdateRequest.class)); } @Test - public void testUpdateJob() throws IOException { + public void testUpdateJob() { ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.builder() .jobId(TEST_JOB_ID) @@ -290,7 +306,9 @@ public void testRemoveJob() { when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); - scheduler.removeJob(TEST_JOB_ID); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); + scheduler.removeJob(request); ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); verify(client).delete(captor.capture()); @@ -304,7 +322,21 @@ public void testRemoveJob() { public void testRemoveJobWithIndexNotFound() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - assertThrows(IllegalStateException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); + assertThrows(IllegalStateException.class, () -> scheduler.removeJob(request)); + } + + @Test + public void testRemoveJobInvalidJobId() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(""); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> scheduler.removeJob(request)); + assertEquals("JobId cannot be null or empty", exception.getMessage()); } @Test @@ -397,11 +429,13 @@ public void testRemoveJobNotFound() { when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> { - scheduler.removeJob(TEST_JOB_ID); + scheduler.removeJob(request); }); assertEquals("Job : testJob doesn't exist", exception.getMessage()); @@ -413,7 +447,9 @@ public void testRemoveJobWithExceptions() { when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception")); - assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId(TEST_JOB_ID); + assertThrows(RuntimeException.class, () -> scheduler.removeJob(request)); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); @@ -421,7 +457,7 @@ public void testRemoveJobWithExceptions() { when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); RuntimeException runtimeException = - Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(request)); Assertions.assertEquals("Remove job failed with result : noop", runtimeException.getMessage()); }