Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 12, 2024
1 parent a37fc16 commit ac87b5d
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setAccountId(flintIndexStateModel.getAccountId());
request.setDataSource(flintIndexStateModel.getDatasourceName());
request.setJobId(flintIndexMetadata.getOpensearchIndexName());
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder()
.accountId(flintIndexStateModel.getAccountId())
.dataSource(flintIndexStateModel.getDatasourceName())
.jobId(flintIndexMetadata.getOpensearchIndexName())
.build();
asyncQueryScheduler.unscheduleJob(request);
} else {
cancelStreamingJob(flintIndexStateModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setAccountId(flintIndexStateModel.getAccountId());
request.setDataSource(flintIndexStateModel.getDatasourceName());
request.setJobId(flintIndexMetadata.getOpensearchIndexName());
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder()
.accountId(flintIndexStateModel.getAccountId())
.dataSource(flintIndexStateModel.getDatasourceName())
.jobId(flintIndexMetadata.getOpensearchIndexName())
.build();
asyncQueryScheduler.unscheduleJob(request);
} else {
cancelStreamingJob(flintIndexStateModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public interface AsyncQueryScheduler {
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
Expand All @@ -53,7 +54,8 @@ public interface AsyncQueryScheduler {
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param asyncQuerySchedulerRequest The request to delete the job configuration
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest
public static final String ENABLED_FIELD = "enabled";
private final Schedule schedule;

@Builder
@Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder")
public ScheduledAsyncQueryJobRequest(
String accountId,
String jobId,
Expand Down Expand Up @@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
AsyncQuerySchedulerRequest request) {
Instant updateTime =
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
return ScheduledAsyncQueryJobRequest.builder()
return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId(request.getAccountId())
.jobId(request.getJobId())
.dataSource(request.getDataSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti
public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder =
ScheduledAsyncQueryJobRequest.builder();
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testScheduleJob() {
when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand All @@ -116,7 +116,7 @@ public void testScheduleJobWithExistingJob() {
.thenReturn(Boolean.TRUE);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testScheduleJobWithExceptions() {
when(client.index(any(IndexRequest.class))).thenThrow(new RuntimeException("Test exception"));

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand All @@ -170,8 +170,8 @@ public void testUnscheduleJob() {

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
scheduler.unscheduleJob(request);

ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
Expand All @@ -197,8 +197,7 @@ public void testUnscheduleJob() {
public void testUnscheduleJobInvalidJobId() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(null);
AsyncQuerySchedulerRequest request = AsyncQuerySchedulerRequest.builder().build();

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> scheduler.unscheduleJob(request));
Expand All @@ -209,8 +208,8 @@ public void testUnscheduleJobInvalidJobId() {
public void testUnscheduleJobWithIndexNotFound() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
scheduler.unscheduleJob(request);

// Verify that no update operation was performed
Expand All @@ -220,7 +219,7 @@ public void testUnscheduleJobWithIndexNotFound() {
@Test
public void testUpdateJob() {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand All @@ -245,7 +244,7 @@ public void testUpdateJob() {
@Test
public void testUpdateJobWithIndexNotFound() {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand All @@ -258,7 +257,7 @@ public void testUpdateJobWithIndexNotFound() {
@Test
public void testUpdateJobWithExceptions() {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand Down Expand Up @@ -306,8 +305,8 @@ public void testRemoveJob() {

when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
scheduler.removeJob(request);

ArgumentCaptor<DeleteRequest> captor = ArgumentCaptor.forClass(DeleteRequest.class);
Expand All @@ -322,17 +321,16 @@ public void testRemoveJob() {
public void testRemoveJobWithIndexNotFound() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
assertThrows(IllegalStateException.class, () -> scheduler.removeJob(request));
}

@Test
public void testRemoveJobInvalidJobId() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId("");
AsyncQuerySchedulerRequest request = AsyncQuerySchedulerRequest.builder().jobId("").build();

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> scheduler.removeJob(request));
Expand Down Expand Up @@ -383,7 +381,7 @@ public void testCreateAsyncQuerySchedulerIndexFailure() {
.thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME));

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand All @@ -399,7 +397,7 @@ public void testCreateAsyncQuerySchedulerIndexFailure() {
@Test
public void testUpdateJobNotFound() {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId(TEST_JOB_ID)
.lastUpdateTime(Instant.now())
.build();
Expand Down Expand Up @@ -429,8 +427,8 @@ public void testRemoveJobNotFound() {

when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture);

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
Expand All @@ -447,8 +445,8 @@ public void testRemoveJobWithExceptions() {

when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception"));

AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setJobId(TEST_JOB_ID);
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder().jobId(TEST_JOB_ID).build();
assertThrows(RuntimeException.class, () -> scheduler.removeJob(request));

DeleteResponse deleteResponse = mock(DeleteResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.lockDurationSeconds(10L)
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testDoRefreshThrowsException() {
spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.lockDurationSeconds(10L)
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testDoRefreshThrowsException() {
@Test
public void testRunJobWithUninitializedServices() {
ScheduledAsyncQueryJobRequest jobParameter =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void testBuilderAndGetterMethods() {
IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES);

ScheduledAsyncQueryJobRequest jobRequest =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId("testAccount")
.jobId("testJob")
.dataSource("testDataSource")
Expand Down Expand Up @@ -62,7 +62,7 @@ public void testToXContent() throws IOException {
IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES);

ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId("testAccount")
.jobId("testJob")
.dataSource("testDataSource")
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testEqualsAndHashCode() {
IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES);

ScheduledAsyncQueryJobRequest request1 =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId("testAccount")
.jobId("testJob")
.dataSource("testDataSource")
Expand All @@ -172,7 +172,7 @@ public void testEqualsAndHashCode() {
assertTrue(toString.contains("jitter=0.1"));

ScheduledAsyncQueryJobRequest request2 =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId("testAccount")
.jobId("testJob")
.dataSource("testDataSource")
Expand All @@ -190,7 +190,7 @@ public void testEqualsAndHashCode() {
assertEquals(request1.hashCode(), request2.hashCode());

ScheduledAsyncQueryJobRequest request3 =
ScheduledAsyncQueryJobRequest.builder()
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId("differentAccount")
.jobId("testJob")
.dataSource("testDataSource")
Expand Down

0 comments on commit ac87b5d

Please sign in to comment.