Skip to content

Commit

Permalink
make flatten result index use detector name
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 29, 2025
1 parent 64cc3bc commit 8a9429d
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ADResultWriteRequest(
RequestPriority priority,
AnomalyResult result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
) {
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected ADResultWriteRequest createResultWriteRequest(
RequestPriority priority,
AnomalyResult result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
) {
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void saveResult(AnomalyResult result, Config config) {
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -114,9 +113,8 @@ protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResult

if (shouldAddResult(indexingPressurePercent, result)) {
addResult(bulkRequest, result, resultIndex);
if (resultWriteRequest.getFlattenResultIndex()) {
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
addResult(bulkRequest, result, flattenedResultIndexAlias);
if (resultWriteRequest.getFlattenResultIndex() != null) {
addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexAlias()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ForecastResultWriteRequest(
RequestPriority priority,
ForecastResult result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
) {
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected ForecastResultWriteRequest createResultWriteRequest(
RequestPriority priority,
ForecastResult result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
) {
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void saveResult(ForecastResult result, Config config) {
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexAlias()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1037,8 +1037,14 @@ public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionLis
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
Expand All @@ -1049,14 +1055,6 @@ public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionLis
}
}

public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {
return indexOrAliasName + "_flattened_" + configId.toLowerCase(Locale.ROOT);
}

public String getFlattenResultIndexIngestPipelineId(String configId) {
return "flatten_result_index_ingest_pipeline" + configId.toLowerCase(Locale.ROOT);
}

public <T> void validateCustomIndexForBackendJob(
String resultIndexOrAlias,
String securityLogId,
Expand Down Expand Up @@ -1304,7 +1302,7 @@ protected void rolloverAndDeleteHistoryIndex(
candidateResultAliases.forEach(config -> {
handleResultIndexRolloverAndDelete(config.getCustomResultIndexOrAlias(), config, resultIndex);
if (config.getFlattenResultIndexMapping()) {
String flattenedResultIndexAlias = getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), config.getId());
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();
handleResultIndexRolloverAndDelete(flattenedResultIndexAlias, config, resultIndex);
}
});
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,20 @@ public boolean getFlattenResultIndexMapping() {
return flattenResultIndexMapping != null ? flattenResultIndexMapping : false;
}

public String getFlattenResultIndexAlias() {
if (getFlattenResultIndexMapping()) {
return (getCustomResultIndexOrAlias() + "_flattened_" + getName()).toLowerCase(Locale.ROOT);
}
return null;
}

public String getFlattenResultIndexIngestPipelineName() {
if (getFlattenResultIndexMapping()) {
return ("flatten_result_index_ingest_pipeline_" + getName()).toLowerCase(Locale.ROOT);
}
return null;
}

public Instant getLastBreakingUIChangeTime() {
return lastUIBreakingChangeTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ public abstract class ResultWriteRequest<ResultType extends IndexableResult> ext
private final ResultType result;
// If resultIndex is null, result will be stored in default result index.
private final String resultIndex;
private final boolean flattenResultIndex;
private final String flattenResultIndex;

public ResultWriteRequest(
long expirationEpochMs,
String configId,
RequestPriority priority,
ResultType result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
) {
super(expirationEpochMs, configId, priority);
this.result = result;
Expand All @@ -41,14 +41,14 @@ public ResultWriteRequest(
public ResultWriteRequest(StreamInput in, Reader<ResultType> resultReader) throws IOException {
this.result = resultReader.read(in);
this.resultIndex = in.readOptionalString();
this.flattenResultIndex = in.readBoolean();
this.flattenResultIndex = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
result.writeTo(out);
out.writeOptionalString(resultIndex);
out.writeBoolean(flattenResultIndex);
out.writeOptionalString(flattenResultIndex);
}

public ResultType getResult() {
Expand All @@ -59,7 +59,7 @@ public String getResultIndex() {
return resultIndex;
}

public boolean getFlattenResultIndex() {
public String getFlattenResultIndex() {
return flattenResultIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private ActionListener<Optional<? extends Config>> onGetConfig(
resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM,
resultToRetry,
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexAlias()
)
);

Expand All @@ -218,6 +218,6 @@ protected abstract ResultWriteRequestType createResultWriteRequest(
RequestPriority priority,
ResultType result,
String resultIndex,
boolean flattenResultIndex
String flattenResultIndex
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,16 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun)) {
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();

timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(initResponse -> setupIngestPipeline(configId, listener, createConfigResponse), listener::onFailure)
.wrap(
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse),
listener::onFailure
)
);
} else {
listener.onResponse(createConfigResponse);
Expand All @@ -489,9 +491,13 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(String configId, ActionListener<T> listener, T createConfigResponse) {
String flattenedResultIndexAlias = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);
protected void setupIngestPipeline(
String flattenedResultIndexAlias,
String configId,
ActionListener<T> listener,
T createConfigResponse
) {
String pipelineId = config.getFlattenResultIndexIngestPipelineName();

try {
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);
Expand Down Expand Up @@ -538,10 +544,13 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
return BytesReference.bytes(pipelineBuilder);
}

private UpdateSettingsRequest buildUpdateSettingsRequest(String defaultPipelineName, String configId) {
String flattenedResultIndex = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
private UpdateSettingsRequest buildUpdateSettingsRequest(
String flattenedResultIndexAlias,
String defaultPipelineName,
String configId
) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
updateSettingsRequest.indices(flattenedResultIndex);
updateSettingsRequest.indices(flattenedResultIndexAlias);

Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put("index.default_pipeline", defaultPipelineName);
Expand All @@ -558,7 +567,7 @@ protected void bindIngestPipelineWithFlattenedResultIndex(
ActionListener<T> listener,
T createConfigResponse
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(pipelineId, configId);
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
Expand Down Expand Up @@ -664,7 +673,11 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
boolean indexingDryRun
) {
// The pipeline name _none specifies that the index does not have an ingest pipeline.
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest("_none", existingConfig.getId());
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(
existingConfig.getFlattenResultIndexAlias(),
"_none",
existingConfig.getId()
);
client
.admin()
.indices()
Expand All @@ -679,7 +692,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
}

private void deleteIngestPipeline(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(existingConfig.getId());
String pipelineId = existingConfig.getFlattenResultIndexIngestPipelineName();

client
.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testRegular() {
RequestPriority.MEDIUM,
detectResult,
null,
false
null
);
request.add(resultWriteRequest);

Expand All @@ -131,7 +131,7 @@ public void testRegular() {
return null;
}).when(resultHandler).flush(any(), any());

resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, null));

// the request results one flush
verify(resultHandler, times(1)).flush(any(), any());
Expand All @@ -154,7 +154,7 @@ public void testSingleRetryRequest() throws IOException {
RequestPriority.MEDIUM,
detectResult,
null,
false
null
);
request.add(resultWriteRequest);

Expand All @@ -170,7 +170,7 @@ public void testSingleRetryRequest() throws IOException {
return null;
}).when(resultHandler).flush(any(), any());

resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, null));

// one flush from the original request; and one due to retry
verify(resultHandler, times(2)).flush(any(), any());
Expand All @@ -190,7 +190,7 @@ public void testRetryException() {
return null;
}).when(resultHandler).flush(any(), any());

resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, null));
// one flush from the original request; and one due to retry
verify(resultHandler, times(2)).flush(any(), any());
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchStatusException.class));
Expand All @@ -204,7 +204,7 @@ public void testOverloaded() {
return null;
}).when(resultHandler).flush(any(), any());

resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, null));
// one flush from the original request; and one due to retry
verify(resultHandler, times(1)).flush(any(), any());
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ public void testCreateAnomalyDetector_withFlattenedResultIndex() throws Exceptio
assertTrue("Incorrect version", version > 0);

// Ensure the flattened result index was created
String expectedFlattenedIndex = String
.format(Locale.ROOT, "opensearch-ad-plugin-result-test_flattened_%s", id.toLowerCase(Locale.ROOT));
String expectedFlattenedIndex = "opensearch-ad-plugin-result-test_flattened_detectorwithflattenresultindex";
assertTrue("Alias for flattened result index does not exist", aliasExists(expectedFlattenedIndex));

// Start detector
Expand Down Expand Up @@ -339,9 +338,8 @@ public void testUpdateAnomalyDetector_disableFlattenResultIndex_shouldDeletePipe
assertEquals("Create anomaly detector with flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
String expectedFlattenedIndex = String
.format(Locale.ROOT, "opensearch-ad-plugin-result-test_flattened_%s", id.toLowerCase(Locale.ROOT));
String expectedPipelineId = String.format(Locale.ROOT, "flatten_result_index_ingest_pipeline%s", id.toLowerCase(Locale.ROOT));
String expectedFlattenedIndex = "opensearch-ad-plugin-result-test_flattened_detectorwithflattenresultindex";
String expectedPipelineId = "flatten_result_index_ingest_pipeline_detectorwithflattenresultindex";
String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId);
Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null);
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() throws Exception {
RequestPriority.MEDIUM,
TestHelpers.randomAnomalyDetectResult(),
null,
false
null
);
request.add(resultWriteRequest);

Expand Down
Loading

0 comments on commit 8a9429d

Please sign in to comment.