Skip to content

Commit

Permalink
delete query index only if put mappings throws an exception
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Oct 2, 2024
1 parent d8f47a0 commit 3533dd7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}

if (monitor.deleteQueryIndexInEveryRun == true &&
monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources)
) {
val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources)
if (!ack) {
logger.error(
"Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! " +
"for monitor ${monitor.id}"
)
}
}
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
Expand Down Expand Up @@ -389,6 +378,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults)
} catch (e: Exception) {
e.printStackTrace()
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
monitorMetadata,
updatedIndexName,
sourceIndexFieldLimit,
updatedProperties
updatedProperties,
indexTimeout
)

if (updateMappingResponse.isAcknowledged) {
Expand Down Expand Up @@ -487,7 +488,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
monitorMetadata: MonitorMetadata,
sourceIndex: String,
sourceIndexFieldLimit: Long,
updatedProperties: MutableMap<String, Any>
updatedProperties: MutableMap<String, Any>,
indexTimeout: TimeValue
): Pair<AcknowledgedResponse, String> {
var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id]
if (
Expand Down Expand Up @@ -551,9 +553,34 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
}
} else {
log.debug("unknown exception during PUT mapping on queryIndex: $targetQueryIndex")
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw AlertingException.wrap(unwrappedException)
// retry with deleting query index
if (monitor.deleteQueryIndexInEveryRun == true && docLevelQueryIndexExists(monitor.dataSources)) {
try {
log.info("unknown exception during PUT mapping on queryIndex: $targetQueryIndex, retrying with deletion of query index")
val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources)
if (!ack) {
log.error(
"Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! " +
"for monitor ${monitor.id}"
)
}
initDocLevelQueryIndex(monitor.dataSources)
indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = indexTimeout
)
} catch (e: Exception) {
log.debug("unknown exception during PUT mapping on queryIndex: $targetQueryIndex")
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw AlertingException.wrap(unwrappedException)
}
} else {
log.debug("unknown exception during PUT mapping on queryIndex: $targetQueryIndex")
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw AlertingException.wrap(unwrappedException)
}
}
}
// We did rollover, so try to apply mappings again on new targetQueryIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
"id", null)), trigger1Serialized)),
Map.of(),
new DataSources(),
true,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
Expand Down Expand Up @@ -155,7 +155,7 @@ public void onFailure(Exception e) {
List.of(),
Map.of(),
new DataSources(),
true,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
Expand Down Expand Up @@ -239,7 +239,7 @@ public void onFailure(Exception e) {
"id", null)), trigger1Serialized)),
Map.of(),
new DataSources(),
true,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(
Expand Down

0 comments on commit 3533dd7

Please sign in to comment.