From c85ed204258a4c5b336e8652208d89be4f36384e Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 4 Sep 2024 09:22:16 -0700 Subject: [PATCH] Resolve comments --- .../dispatcher/model/FlintIndexOptions.java | 2 +- .../flint/operation/FlintIndexOpAlter.java | 2 +- .../flint/operation/FlintIndexOpDrop.java | 2 +- .../flint/operation/FlintIndexOpVacuum.java | 2 +- .../spark/scheduler/AsyncQueryScheduler.java | 46 ++++++++++- .../asyncquery/AsyncQueryCoreIntegTest.java | 4 - .../operation/FlintIndexOpVacuumTest.java | 3 - .../parser/IntervalScheduleParserTest.java | 81 ++++++------------- 8 files changed, 71 insertions(+), 71 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java index 6c7cc7c5fb..d5ee82a42c 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java @@ -34,7 +34,7 @@ public boolean autoRefresh() { return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false")); } - public boolean isExternalScheduler() { + public boolean useSparkScheduler() { // Default is false, which means using internal scheduler to refresh the index. return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index de34803823..5218cb5e0f 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -61,7 +61,7 @@ void runOp( "Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); this.flintIndexMetadataService.updateIndexToManualRefresh( flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext); - if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) { asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); } else { cancelStreamingJob(flintIndexStateModel); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index 3fa5423c10..f6e4013654 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -53,7 +53,7 @@ void runOp( LOG.debug( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); - if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) { asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); } else { cancelStreamingJob(flintIndexStateModel); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index 324ddb5720..56ead8a6b9 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -52,7 +52,7 @@ public void runOp( FlintIndexStateModel flintIndex, AsyncQueryRequestContext asyncQueryRequestContext) { LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); - if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) { asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName()); } flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName()); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java index 0bcec34b2f..8ac499081e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java @@ -5,15 +5,53 @@ /** Scheduler interface for scheduling asynchronous query jobs. */ public interface AsyncQueryScheduler { - /** Schedules a new job. */ + /** + * Schedules a new job in the system. This method creates a new job entry based on the provided + * request parameters. + * + *

Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh + * task + * + * @param asyncQuerySchedulerRequest The request containing job configuration details + * @throws IllegalArgumentException if a job with the same name already exists + * @throws RuntimeException if there's an error during job creation + */ void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); - /** Updates an existing job with new parameters. */ + /** + * Updates an existing job with new parameters. This method modifies the configuration of an + * already scheduled job. + * + *

Use cases: - Changing the schedule of an existing job - Modifying query parameters of a + * scheduled job - Updating resource allocations for a job + * + * @param asyncQuerySchedulerRequest The request containing updated job configuration + * @throws IllegalArgumentException if the job to be updated doesn't exist + * @throws RuntimeException if there's an error during the update process + */ void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); - /** Unschedules a job by marking it as disabled and updating its last update time. */ + /** + * Unschedules a job by marking it as disabled and updating its last update time. This method is + * used when you want to temporarily stop a job from running but keep its configuration and + * history in the system. + * + *

Use cases: - Pausing a job that's causing issues without losing its configuration - + * Temporarily disabling a job during maintenance or high-load periods - Allowing for easy + * re-enabling of the job in the future + * + * @param jobId The unique identifier of the job to unschedule + */ void unscheduleJob(String jobId); - /** Removes a job. */ + /** + * Removes a job completely from the scheduler. This method permanently deletes the job and all + * its associated data from the system. + * + *

Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously + * created jobs - Freeing up resources by deleting unused job configurations + * + * @param jobId The unique identifier of the job to remove + */ void removeJob(String jobId); } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index ff5cc8df04..49ea38c2dc 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -229,7 +229,6 @@ public void createDropIndexQueryWithScheduler() { verifyCreateIndexDMLResultCalled(); verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); - // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler verify(asyncQueryScheduler).unscheduleJob(indexName); } @@ -274,12 +273,10 @@ public void createVacuumIndexQueryWithScheduler() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); - // Verifying that the index is deleted verify(flintIndexClient).deleteIndex(indexName); verifyCreateIndexDMLResultCalled(); verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); - // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler verify(asyncQueryScheduler).removeJob(indexName); } @@ -342,7 +339,6 @@ public void createAlterIndexQueryWithScheduler() { FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue(); assertFalse(flintIndexOptions.autoRefresh()); - // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler verify(asyncQueryScheduler).unscheduleJob(indexName); verifyCreateIndexDMLResultCalled(); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java index e736b0e93a..08f8efd488 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java @@ -63,7 +63,6 @@ public void setUp() { asyncQueryScheduler); } - // Helper method to create FlintIndexMetadata with latest ID private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() { return FlintIndexMetadata.builder() .latestId(LATEST_ID) @@ -72,7 +71,6 @@ private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() { .build(); } - // Helper method to create FlintIndexMetadata without latest ID private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() { return FlintIndexMetadata.builder() .opensearchIndexName(INDEX_NAME) @@ -80,7 +78,6 @@ private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() { .build(); } - // Helper method to create FlintIndexMetadata with external scheduler private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() { FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java index 98150365eb..b119c345b9 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java @@ -27,28 +27,14 @@ public void setup() { @Test public void testParseValidScheduleString() { - String scheduleStr = "5 minutes"; - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 5, ChronoUnit.MINUTES), schedule); + verifyParseSchedule(5, "5 minutes"); } @Test public void testParseValidScheduleStringWithDifferentUnits() { - String scheduleStr = "2 hours"; - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 120, ChronoUnit.MINUTES), schedule); - - scheduleStr = "1 day"; - schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 1440, ChronoUnit.MINUTES), schedule); - - scheduleStr = "3 weeks"; - schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 30240, ChronoUnit.MINUTES), schedule); + verifyParseSchedule(120, "2 hours"); + verifyParseSchedule(1440, "1 day"); + verifyParseSchedule(30240, "3 weeks"); } @Test @@ -61,7 +47,6 @@ public void testParseNullSchedule() { public void testParseScheduleObject() { IntervalSchedule expectedSchedule = new IntervalSchedule(startTime, 10, ChronoUnit.MINUTES); Schedule schedule = IntervalScheduleParser.parse(expectedSchedule, startTime); - assertEquals(expectedSchedule, schedule); } @@ -79,21 +64,15 @@ public void testParseInvalidScheduleString() { @Test public void testParseUnsupportedUnits() { - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> IntervalScheduleParser.parse("1 year", startTime), - "Expected IllegalArgumentException but no exception was thrown"); - - assertEquals("Years cannot be converted to minutes accurately.", exception.getMessage()); - - exception = - assertThrows( - IllegalArgumentException.class, - () -> IntervalScheduleParser.parse("1 month", startTime), - "Expected IllegalArgumentException but no exception was thrown"); - - assertEquals("Months cannot be converted to minutes accurately.", exception.getMessage()); + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 year", startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 month", startTime), + "Expected IllegalArgumentException but no exception was thrown"); } @Test @@ -110,44 +89,34 @@ public void testParseNonStringSchedule() { @Test public void testParseScheduleWithNanoseconds() { - String scheduleStr = "60000000000 nanoseconds"; // Equivalent to 1 minute - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + verifyParseSchedule(1, "60000000000 nanoseconds"); } @Test public void testParseScheduleWithMilliseconds() { - String scheduleStr = "60000 milliseconds"; // Equivalent to 1 minute - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + verifyParseSchedule(1, "60000 milliseconds"); } @Test public void testParseScheduleWithMicroseconds() { - String scheduleStr = "60000000 microseconds"; // Equivalent to 1 minute - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); - - assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule); + verifyParseSchedule(1, "60000000 microseconds"); } @Test public void testUnsupportedTimeUnit() { - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"), - "Expected IllegalArgumentException but no exception was thrown"); - - assertEquals("Unsupported time unit: unsupportedunit", exception.getMessage()); + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"), + "Expected IllegalArgumentException but no exception was thrown"); } @Test public void testParseScheduleWithSeconds() { - String scheduleStr = "120 seconds"; // Equivalent to 2 minutes - Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + verifyParseSchedule(2, "120 seconds"); + } - assertEquals(new IntervalSchedule(startTime, 2, ChronoUnit.MINUTES), schedule); + private void verifyParseSchedule(int expectedMinutes, String scheduleStr) { + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + assertEquals(new IntervalSchedule(startTime, expectedMinutes, ChronoUnit.MINUTES), schedule); } }