Skip to content

Commit

Permalink
add support for dynamic monitor metadata updates in remote monitors
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Jun 25, 2024
1 parent 8801a63 commit dcf2fce
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand All @@ -27,6 +28,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Instant
Expand Down Expand Up @@ -64,12 +66,27 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
logger.info(monitorMetadata.lastRunContext.toMutableMap().toString())
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
val updatedLastRunContext = lastRunContext.toMutableMap()

val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput
val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput
var shards: Set<String> = mutableSetOf()
var concreteIndices = listOf<String>()

// Resolve all passed indices to concrete indices
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

try {
docLevelMonitorInput.indices.forEach { indexName ->
concreteIndices = IndexUtils.resolveAllIndices(
Expand All @@ -93,6 +110,39 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
}

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(
indexName,
monitorCtx.clusterService!!.state()
)
) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
}

concreteIndices.forEach {
val shardCount = getShardsCount(monitorCtx.clusterService!!, it)
for (i in 0 until shardCount) {
Expand All @@ -111,7 +161,7 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
val docLevelMonitorFanOutResponses = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.doFanOut(
monitorCtx.clusterService!!,
monitor,
monitorMetadata,
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
executionId,
concreteIndices,
workflowRunContext,
Expand All @@ -120,12 +170,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
nodeMap,
nodeShardAssignments
)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, lastRunContext)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses)
val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses)
if (!isTempMonitor) {
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = lastRunContext),
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
Expand Down Expand Up @@ -216,17 +266,17 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {
if (fanOutResponse.lastRunContexts.contains(indexName)) {
(fanOutResponse.lastRunContexts[indexName] as Map<String, Any>).forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
val seq_no = it.value.toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
it.key != "shards_count" &&
it.key != "index" &&
seq_no != null &&
seq_no >= 0
) {
indexLastRunContext[it] = seq_no
indexLastRunContext[it.key] = seq_no
}
}
}
Expand Down Expand Up @@ -309,4 +359,29 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null)
}

private fun createdRecently(
monitor: Monitor,
periodStart: Instant,
periodEnd: Instant,
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
}
return updatedLastRunContext
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ public void onFailure(Exception e) {
);
};
} else {
String indices = restRequest.param("index", "index");
List<String> index = List.of(indices.split(","));
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput =
new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2);
BytesStreamOutput out2 = new BytesStreamOutput();
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList());
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, Action
SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin);


((Map<String, Object>) lastRunContext.get(index)).put("0", 0);
if (lastRunContext.containsKey(index)) {
((Map<String, Object>) lastRunContext.get(index)).put("2", 0);
}
if (docLevelMonitorInput.getIndices().size() > 1 && lastRunContext.containsKey(docLevelMonitorInput.getIndices().get(1))) {
((Map<String, Object>) lastRunContext.get(docLevelMonitorInput.getIndices().get(1))).put("4", 0);
}
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
.source(Map.of(sampleRemoteDocLevelMonitorInput.getA(), remoteMonitorTrigger.getA())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -167,7 +168,112 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 &&
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
} catch (IOException ex) {
return false;
}
}, 10, TimeUnit.SECONDS);
Assert.assertTrue(found.get());
}

@SuppressWarnings("unchecked")
public void testSampleRemoteDocLevelMonitorWithDynamicMetadataUpdate() throws IOException, InterruptedException {
createIndex("index1", Settings.builder().put("number_of_shards", "7").build());
Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor",
Map.of("run_monitor", "doc_level", "index", "index1"), null);
Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();
String monitorId = responseJson.get("_id").toString();

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

createIndex("index2", Settings.builder().put("number_of_shards", "7").build());
String updatedMonitor = String.format(Locale.ROOT, "{\"type\":\"monitor\",\"name\":\"remote_doc_level_monitor\",\"monitor_type\":\"remote_doc_level_monitor\"," +
"\"user\":{\"name\":\"\",\"backend_roles\":[],\"roles\":[],\"custom_attribute_names\":[],\"user_requested_tenant\":null},\"enabled\":true,\"schedule\":{\"period\":{\"interval\":5,\"unit\":\"MINUTES\"}}," +
"\"inputs\":[{\"remote_doc_level_monitor_input\":{\"size\":24,\"input\":\"BWhlbGxvCgEFd29ybGQBAAAAAQAAAAIA\",\"doc_level_input\":" +
"{\"doc_level_input\":{\"description\":\"description\",\"indices\":[%s],\"queries\":[]}}}}],\"" +
"triggers\":[{\"remote_monitor_trigger\":{\"id\":\"id\",\"name\":\"name\",\"severity\":\"1\"," +
"\"actions\":[{\"id\":\"id\",\"name\":\"name\",\"destination_id\":\"destinationId\",\"message_template\":{\"source\":\"Hello World\"," +
"\"lang\":\"mustache\"},\"throttle_enabled\":false,\"subject_template\":{\"source\":\"Hello World\",\"lang\":\"mustache\"}," +
"\"throttle\":{\"value\":60,\"unit\":\"MINUTES\"}}],\"size\":24,\"trigger\":\"BWhlbGxvCgEEdGVzdAM/gAAAAAAAAQAA\"}}]," +
"\"owner\":\"sample-remote-monitor-plugin\"}", "\"index1\", \"index2\"");
makeRequest(client(), "PUT", "/_plugins/_alerting/monitors/" + monitorId, Map.of(), new StringEntity(updatedMonitor, ContentType.APPLICATION_JSON));

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

makeRequest(client(), "DELETE", "/index1", Map.of(), null);

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

AtomicBoolean found = new AtomicBoolean(false);
OpenSearchRestTestCase.waitUntil(
() -> {
try {
Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(),
new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON));
Map<String, Object> searchResponseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
} catch (IOException ex) {
return false;
}
}, 10, TimeUnit.SECONDS);
Assert.assertTrue(found.get());
}

@SuppressWarnings("unchecked")
public void testSampleRemoteDocLevelMonitorWithAlias() throws IOException, InterruptedException {
String indexAlias = "test_alias";
createIndex("index-000001", Settings.EMPTY);
makeRequest(client(), "POST", "_aliases", Map.of(),
new StringEntity(String.format(Locale.ROOT, "{\"actions\":[{\"add\":{\"index\":\"index-000001\",\"alias\":\"%s\",\"is_write_index\":true}}]}", indexAlias), ContentType.APPLICATION_JSON));
Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor", Map.of("run_monitor", "doc_level", "index", indexAlias), null);
Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();
String monitorId = responseJson.get("_id").toString();

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

makeRequest(client(), "POST", String.format(Locale.ROOT, "%s/_rollover", indexAlias), Map.of(),
new StringEntity("", ContentType.APPLICATION_JSON));

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
AtomicBoolean found = new AtomicBoolean(false);
OpenSearchRestTestCase.waitUntil(
() -> {
try {
Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(),
new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON));
Map<String, Object> searchResponseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
Expand Down

0 comments on commit dcf2fce

Please sign in to comment.