diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 3cc9bdac..3ce81671 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -4,6 +4,7 @@ */ package org.opensearch.commons.alerting +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse @@ -28,6 +29,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.commons.utils.recreateObject @@ -314,6 +316,27 @@ object AlertingPluginInterface { ) } + /** + * Search Monitors interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun searchMonitors( + client: NodeClient, + request: SearchMonitorRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.SEARCH_MONITORS_ACTION_TYPE, + request, + // we do not use the wrapActionListener in this case since there is no need + // to recreate any object or specially handle onResponse / onFailure. It is + // simply returning a SearchResponse. + listener + ) + } + @Suppress("UNCHECKED_CAST") private fun wrapActionListener( listener: ActionListener, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index ac192985..f4e68d73 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" @@ -19,6 +20,7 @@ object AlertingActions { const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack" const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe" const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get" + const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search" @JvmField val INDEX_MONITOR_ACTION_TYPE = @@ -65,4 +67,8 @@ object AlertingActions { @JvmField val GET_MONITOR_ACTION_TYPE = ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse) + + @JvmField + val SEARCH_MONITORS_ACTION_TYPE = + ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt new file mode 100644 index 00000000..003d3316 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.search.SearchRequest +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import java.io.IOException + +class SearchMonitorRequest : ActionRequest { + + val searchRequest: SearchRequest + + constructor( + searchRequest: SearchRequest + ) : super() { + this.searchRequest = searchRequest + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + searchRequest = SearchRequest(sin) + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + searchRequest.writeTo(out) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 553be177..3ce4f6bb 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -10,6 +10,7 @@ import org.mockito.Mockito import org.mockito.Mockito.mock import org.mockito.junit.jupiter.MockitoExtension import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest @@ -34,6 +35,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs @@ -285,4 +287,18 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.getMonitor(client, request, listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + + @Test + fun searchMonitors() { + val request = mock(SearchMonitorRequest::class.java) + val response = mock(SearchResponse::class.java) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.searchMonitors(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt new file mode 100644 index 00000000..169814ea --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.search.SearchRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.unit.TimeValue +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.rest.OpenSearchRestTestCase +import java.util.concurrent.TimeUnit + +class SearchMonitorRequestTests : OpenSearchTestCase() { + + fun `test search monitors request`() { + val searchSourceBuilder = SearchSourceBuilder().from(0).size(100).timeout(TimeValue(60, TimeUnit.SECONDS)) + val searchRequest = SearchRequest().indices(OpenSearchRestTestCase.randomAlphaOfLength(10)).source(searchSourceBuilder) + val searchMonitorRequest = SearchMonitorRequest(searchRequest) + assertNotNull(searchMonitorRequest) + + val out = BytesStreamOutput() + searchMonitorRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = SearchMonitorRequest(sin) + + assertNotNull(newReq.searchRequest) + assertEquals(1, newReq.searchRequest.indices().size) + } +}