Skip to content

Commit

Permalink
Add more UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 3, 2024
1 parent d2c316b commit 83d3687
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,31 @@ public void createDropIndexQuery() {
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
}

@Test
public void createDropIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"DROP INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);
}

@Test
public void createVacuumIndexQuery() {
givenSparkExecutionEngineConfigIsSupplied();
Expand All @@ -230,6 +255,34 @@ public void createVacuumIndexQuery() {
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
}

@Test
public void createVacuumIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();

// Verifying that the index is deleted
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).removeJob(indexName);
}

@Test
public void createAlterIndexQuery() {
givenSparkExecutionEngineConfigIsSupplied();
Expand Down Expand Up @@ -261,6 +314,41 @@ public void createAlterIndexQuery() {
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
}

@Test
public void createAlterIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"ALTER INDEX index_name ON table_name WITH (auto_refresh = false)",
DATASOURCE_NAME,
LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();

verify(flintIndexMetadataService)
.updateIndexToManualRefresh(
eq(indexName), flintIndexOptionsArgumentCaptor.capture(), eq(asyncQueryRequestContext));

FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);

verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
}

@Test
public void createStreamingQuery() {
givenSparkExecutionEngineConfigIsSupplied();
Expand Down Expand Up @@ -510,7 +598,8 @@ private void givenSparkExecutionEngineConfigIsSupplied() {
.build());
}

private void givenFlintIndexMetadataExists(String indexName) {
private void givenFlintIndexMetadataExists(
String indexName, FlintIndexOptions flintIndexOptions) {
when(flintIndexMetadataService.getFlintIndexMetadata(indexName, asyncQueryRequestContext))
.thenReturn(
ImmutableMap.of(
Expand All @@ -519,10 +608,27 @@ private void givenFlintIndexMetadataExists(String indexName) {
.appId(APPLICATION_ID)
.jobId(JOB_ID)
.opensearchIndexName(indexName)
.flintIndexOptions(new FlintIndexOptions())
.flintIndexOptions(flintIndexOptions)
.build()));
}

// Overload method for default FlintIndexOptions usage
private void givenFlintIndexMetadataExists(String indexName) {
givenFlintIndexMetadataExists(indexName, new FlintIndexOptions());
}

// Method to set up FlintIndexMetadata with external scheduler
private void givenFlintIndexMetadataExistsWithExternalScheduler(String indexName) {
givenFlintIndexMetadataExists(indexName, createExternalSchedulerFlintIndexOptions());
}

// Helper method for creating FlintIndexOptions with external scheduler
private FlintIndexOptions createExternalSchedulerFlintIndexOptions() {
FlintIndexOptions options = new FlintIndexOptions();
options.setOption(FlintIndexOptions.SCHEDULER_MODE, "external");
return options;
}

private void givenValidDataSourceMetadataExist() {
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
DATASOURCE_NAME, asyncQueryRequestContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -32,17 +33,13 @@ class FlintIndexOpVacuumTest {
public static final String DATASOURCE_NAME = "DATASOURCE_NAME";
public static final String LATEST_ID = "LATEST_ID";
public static final String INDEX_NAME = "INDEX_NAME";

public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID =
FlintIndexMetadata.builder()
.latestId(LATEST_ID)
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
createFlintIndexMetadataWithLatestId();

public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID =
FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
createFlintIndexMetadataWithoutLatestId();

@Mock FlintIndexClient flintIndexClient;
@Mock FlintIndexStateModelService flintIndexStateModelService;
@Mock EMRServerlessClientFactory emrServerlessClientFactory;
Expand All @@ -66,6 +63,34 @@ public void setUp() {
asyncQueryScheduler);
}

// Helper method to create FlintIndexMetadata with latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() {
return FlintIndexMetadata.builder()
.latestId(LATEST_ID)
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
}

// Helper method to create FlintIndexMetadata without latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() {
return FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
}

// Helper method to create FlintIndexMetadata with external scheduler
private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() {
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external");

return FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(flintIndexOptions)
.build();
}

@Test
public void testApplyWithEmptyLatestId() {
flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID, asyncQueryRequestContext);
Expand Down Expand Up @@ -218,4 +243,22 @@ public void testApplyHappyPath() {
.deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext);
verify(flintIndexClient).deleteIndex(INDEX_NAME);
}

@Test
public void testRunOpWithExternalScheduler() {
FlintIndexMetadata flintIndexMetadata = createFlintIndexMetadataWithExternalScheduler();
flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext);

verify(asyncQueryScheduler).removeJob(INDEX_NAME);
verify(flintIndexClient).deleteIndex(INDEX_NAME);
}

@Test
public void testRunOpWithoutExternalScheduler() {
FlintIndexMetadata flintIndexMetadata = FLINT_INDEX_METADATA_WITHOUT_LATEST_ID;
flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext);

verify(asyncQueryScheduler, never()).removeJob(INDEX_NAME);
verify(flintIndexClient).deleteIndex(INDEX_NAME);
}
}

0 comments on commit 83d3687

Please sign in to comment.