From 6cd843d3a3e0b40af63a1075dd58ac20aa14ee48 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Mon, 20 Nov 2023 16:38:15 +0000 Subject: [PATCH 1/4] Add get monitor request/response Signed-off-by: Tyler Ohlsen --- .../alerting/action/AlertingActions.kt | 5 + .../alerting/action/GetMonitorRequest.kt | 56 ++++++++ .../alerting/action/GetMonitorResponse.kt | 133 ++++++++++++++++++ .../alerting/action/GetMonitorRequestTests.kt | 60 ++++++++ .../action/GetMonitorResponseTests.kt | 66 +++++++++ 5 files changed, 320 insertions(+) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index c2bae396..ac192985 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -18,6 +18,7 @@ object AlertingActions { const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack" const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack" const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe" + const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get" @JvmField val INDEX_MONITOR_ACTION_TYPE = @@ -60,4 +61,8 @@ object AlertingActions { @JvmField val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE = ActionType(ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse) + + @JvmField + val GET_MONITOR_ACTION_TYPE = + ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt new file mode 100644 index 00000000..6c1df281 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.rest.RestRequest +import org.opensearch.search.fetch.subphase.FetchSourceContext +import java.io.IOException + +class GetMonitorRequest : ActionRequest { + val monitorId: String + val version: Long + val method: RestRequest.Method + val srcContext: FetchSourceContext? + + constructor( + monitorId: String, + version: Long, + method: RestRequest.Method, + srcContext: FetchSourceContext? + ) : super() { + this.monitorId = monitorId + this.version = version + this.method = method + this.srcContext = srcContext + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // monitorId + sin.readLong(), // version + sin.readEnum(RestRequest.Method::class.java), // method + if (sin.readBoolean()) { + FetchSourceContext(sin) // srcContext + } else null + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorId) + out.writeLong(version) + out.writeEnum(method) + out.writeBoolean(srcContext != null) + srcContext?.writeTo(out) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt new file mode 100644 index 00000000..22549ec7 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM +import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentFragment +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class GetMonitorResponse : ActionResponse, ToXContentObject { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + var status: RestStatus + var monitor: Monitor? + var associatedWorkflows: List? + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + status: RestStatus, + monitor: Monitor?, + associatedCompositeMonitors: List?, + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.status = status + this.monitor = monitor + this.associatedWorkflows = associatedCompositeMonitors ?: emptyList() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), // id + version = sin.readLong(), // version + seqNo = sin.readLong(), // seqNo + primaryTerm = sin.readLong(), // primaryTerm + status = sin.readEnum(RestStatus::class.java), // RestStatus + monitor = if (sin.readBoolean()) { + Monitor.readFrom(sin) // monitor + } else null, + associatedCompositeMonitors = sin.readList((AssociatedWorkflow)::readFrom), + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeEnum(status) + if (monitor != null) { + out.writeBoolean(true) + monitor?.writeTo(out) + } else { + out.writeBoolean(false) + } + associatedWorkflows?.forEach { + it.writeTo(out) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + if (monitor != null) { + builder.field("monitor", monitor) + } + if (associatedWorkflows != null) { + builder.field("associated_workflows", associatedWorkflows!!.toTypedArray()) + } + return builder.endObject() + } + + class AssociatedWorkflow : ToXContentFragment { + val id: String + val name: String + + constructor(id: String, name: String) { + this.id = id + this.name = name + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("id", id) + .field("name", name) + .endObject() + return builder + } + + fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString() + ) + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): AssociatedWorkflow { + return AssociatedWorkflow(sin) + } + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt new file mode 100644 index 00000000..c916b993 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.rest.RestRequest +import org.opensearch.search.fetch.subphase.FetchSourceContext +import org.opensearch.test.OpenSearchTestCase + +class GetMonitorRequestTests : OpenSearchTestCase() { + + fun `test get monitor request`() { + + val req = GetMonitorRequest("1234", 1L, RestRequest.Method.GET, FetchSourceContext.FETCH_SOURCE) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(1L, newReq.version) + assertEquals(RestRequest.Method.GET, newReq.method) + assertEquals(FetchSourceContext.FETCH_SOURCE, newReq.srcContext) + } + + fun `test get monitor request without src context`() { + + val req = GetMonitorRequest("1234", 1L, RestRequest.Method.GET, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(1L, newReq.version) + assertEquals(RestRequest.Method.GET, newReq.method) + assertEquals(null, newReq.srcContext) + } + + fun `test head monitor request`() { + + val req = GetMonitorRequest("1234", 2L, RestRequest.Method.HEAD, FetchSourceContext.FETCH_SOURCE) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(2L, newReq.version) + assertEquals(RestRequest.Method.HEAD, newReq.method) + assertEquals(FetchSourceContext.FETCH_SOURCE, newReq.srcContext) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt new file mode 100644 index 00000000..d27ca6ee --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.CronSchedule +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.randomUser +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant +import java.time.ZoneId + +class GetMonitorResponseTests : OpenSearchTestCase() { + + fun `test get monitor response`() { + val req = GetMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, null, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorResponse(sin) + assertEquals("1234", newReq.id) + assertEquals(1L, newReq.version) + assertEquals(RestStatus.OK, newReq.status) + assertEquals(null, newReq.monitor) + } + + fun `test get monitor response with monitor`() { + val cronExpression = "31 * * * *" // Run at minute 31. + val testInstance = Instant.ofEpochSecond(1538164858L) + + val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) + val monitor = Monitor( + id = "123", + version = 0L, + name = "test-monitor", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + triggers = mutableListOf(), + uiMetadata = mutableMapOf() + ) + val req = GetMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, monitor, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorResponse(sin) + assertEquals("1234", newReq.id) + assertEquals(1L, newReq.version) + assertEquals(RestStatus.OK, newReq.status) + assertNotNull(newReq.monitor) + } +} From cc4b32d6ef74894f266b123f912aac89b3e63f5e Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Mon, 20 Nov 2023 17:47:59 +0000 Subject: [PATCH 2/4] Remove status from response; add to interface Signed-off-by: Tyler Ohlsen --- .../alerting/AlertingPluginInterface.kt | 26 +++++++++++++++++++ .../alerting/action/GetMonitorResponse.kt | 11 ++------ .../action/GetMonitorResponseTests.kt | 7 ++--- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 5949582b..3cc9bdac 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -17,6 +17,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetMonitorRequest +import org.opensearch.commons.alerting.action.GetMonitorResponse import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse import org.opensearch.commons.alerting.action.GetWorkflowRequest @@ -288,6 +290,30 @@ object AlertingPluginInterface { ) } + /** + * Get Monitor interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun getMonitor( + client: NodeClient, + request: GetMonitorRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.GET_MONITOR_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + GetMonitorResponse( + it + ) + } + } + ) + } + @Suppress("UNCHECKED_CAST") private fun wrapActionListener( listener: ActionListener, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt index 22549ec7..49903853 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt @@ -10,22 +10,19 @@ import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION -import org.opensearch.core.action.ActionResponse +import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentFragment -import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentBuilder import java.io.IOException -class GetMonitorResponse : ActionResponse, ToXContentObject { +class GetMonitorResponse : BaseResponse { var id: String var version: Long var seqNo: Long var primaryTerm: Long - var status: RestStatus var monitor: Monitor? var associatedWorkflows: List? @@ -34,7 +31,6 @@ class GetMonitorResponse : ActionResponse, ToXContentObject { version: Long, seqNo: Long, primaryTerm: Long, - status: RestStatus, monitor: Monitor?, associatedCompositeMonitors: List?, ) : super() { @@ -42,7 +38,6 @@ class GetMonitorResponse : ActionResponse, ToXContentObject { this.version = version this.seqNo = seqNo this.primaryTerm = primaryTerm - this.status = status this.monitor = monitor this.associatedWorkflows = associatedCompositeMonitors ?: emptyList() } @@ -53,7 +48,6 @@ class GetMonitorResponse : ActionResponse, ToXContentObject { version = sin.readLong(), // version seqNo = sin.readLong(), // seqNo primaryTerm = sin.readLong(), // primaryTerm - status = sin.readEnum(RestStatus::class.java), // RestStatus monitor = if (sin.readBoolean()) { Monitor.readFrom(sin) // monitor } else null, @@ -66,7 +60,6 @@ class GetMonitorResponse : ActionResponse, ToXContentObject { out.writeLong(version) out.writeLong(seqNo) out.writeLong(primaryTerm) - out.writeEnum(status) if (monitor != null) { out.writeBoolean(true) monitor?.writeTo(out) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt index d27ca6ee..d91c7471 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt @@ -10,7 +10,6 @@ import org.opensearch.commons.alerting.model.CronSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.randomUser import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.rest.RestStatus import org.opensearch.test.OpenSearchTestCase import java.time.Instant import java.time.ZoneId @@ -18,7 +17,7 @@ import java.time.ZoneId class GetMonitorResponseTests : OpenSearchTestCase() { fun `test get monitor response`() { - val req = GetMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, null, null) + val req = GetMonitorResponse("1234", 1L, 2L, 0L, null, null) assertNotNull(req) val out = BytesStreamOutput() @@ -27,7 +26,6 @@ class GetMonitorResponseTests : OpenSearchTestCase() { val newReq = GetMonitorResponse(sin) assertEquals("1234", newReq.id) assertEquals(1L, newReq.version) - assertEquals(RestStatus.OK, newReq.status) assertEquals(null, newReq.monitor) } @@ -51,7 +49,7 @@ class GetMonitorResponseTests : OpenSearchTestCase() { triggers = mutableListOf(), uiMetadata = mutableMapOf() ) - val req = GetMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, monitor, null) + val req = GetMonitorResponse("1234", 1L, 2L, 0L, monitor, null) assertNotNull(req) val out = BytesStreamOutput() @@ -60,7 +58,6 @@ class GetMonitorResponseTests : OpenSearchTestCase() { val newReq = GetMonitorResponse(sin) assertEquals("1234", newReq.id) assertEquals(1L, newReq.version) - assertEquals(RestStatus.OK, newReq.status) assertNotNull(newReq.monitor) } } From 394e726c95ef748d6707bc9d71f5dc1be68bddc7 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 21 Nov 2023 17:03:10 +0000 Subject: [PATCH 3/4] Add UT Signed-off-by: Tyler Ohlsen --- .../alerting/AlertingPluginInterfaceTests.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index b4c39766..553be177 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -23,6 +23,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetMonitorRequest +import org.opensearch.commons.alerting.action.GetMonitorResponse import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse import org.opensearch.commons.alerting.action.GetWorkflowRequest @@ -269,4 +271,18 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.acknowledgeChainedAlerts(client, request, listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + + @Test + fun getMonitor() { + val request = mock(GetMonitorRequest::class.java) + val response = GetMonitorResponse("test-id", 1, 1, 1, null, null) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.getMonitor(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } } From 9b3f1beef770d36299f27f80483092b9b46e64cf Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 21 Nov 2023 18:48:27 +0000 Subject: [PATCH 4/4] Repeat for search monitor action Signed-off-by: Tyler Ohlsen --- .../alerting/AlertingPluginInterface.kt | 23 +++++++++++ .../alerting/action/AlertingActions.kt | 6 +++ .../alerting/action/SearchMonitorRequest.kt | 38 +++++++++++++++++++ .../alerting/AlertingPluginInterfaceTests.kt | 16 ++++++++ .../action/SearchMonitorRequestTests.kt | 33 ++++++++++++++++ 5 files changed, 116 insertions(+) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 3cc9bdac..3ce81671 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -4,6 +4,7 @@ */ package org.opensearch.commons.alerting +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse @@ -28,6 +29,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.commons.utils.recreateObject @@ -314,6 +316,27 @@ object AlertingPluginInterface { ) } + /** + * Search Monitors interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun searchMonitors( + client: NodeClient, + request: SearchMonitorRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.SEARCH_MONITORS_ACTION_TYPE, + request, + // we do not use the wrapActionListener in this case since there is no need + // to recreate any object or specially handle onResponse / onFailure. It is + // simply returning a SearchResponse. + listener + ) + } + @Suppress("UNCHECKED_CAST") private fun wrapActionListener( listener: ActionListener, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index ac192985..f4e68d73 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" @@ -19,6 +20,7 @@ object AlertingActions { const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack" const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe" const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get" + const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search" @JvmField val INDEX_MONITOR_ACTION_TYPE = @@ -65,4 +67,8 @@ object AlertingActions { @JvmField val GET_MONITOR_ACTION_TYPE = ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse) + + @JvmField + val SEARCH_MONITORS_ACTION_TYPE = + ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt new file mode 100644 index 00000000..003d3316 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.search.SearchRequest +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import java.io.IOException + +class SearchMonitorRequest : ActionRequest { + + val searchRequest: SearchRequest + + constructor( + searchRequest: SearchRequest + ) : super() { + this.searchRequest = searchRequest + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + searchRequest = SearchRequest(sin) + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + searchRequest.writeTo(out) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 553be177..3ce4f6bb 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -10,6 +10,7 @@ import org.mockito.Mockito import org.mockito.Mockito.mock import org.mockito.junit.jupiter.MockitoExtension import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest @@ -34,6 +35,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs @@ -285,4 +287,18 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.getMonitor(client, request, listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + + @Test + fun searchMonitors() { + val request = mock(SearchMonitorRequest::class.java) + val response = mock(SearchResponse::class.java) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.searchMonitors(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt new file mode 100644 index 00000000..169814ea --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.search.SearchRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.unit.TimeValue +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.rest.OpenSearchRestTestCase +import java.util.concurrent.TimeUnit + +class SearchMonitorRequestTests : OpenSearchTestCase() { + + fun `test search monitors request`() { + val searchSourceBuilder = SearchSourceBuilder().from(0).size(100).timeout(TimeValue(60, TimeUnit.SECONDS)) + val searchRequest = SearchRequest().indices(OpenSearchRestTestCase.randomAlphaOfLength(10)).source(searchSourceBuilder) + val searchMonitorRequest = SearchMonitorRequest(searchRequest) + assertNotNull(searchMonitorRequest) + + val out = BytesStreamOutput() + searchMonitorRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = SearchMonitorRequest(sin) + + assertNotNull(newReq.searchRequest) + assertEquals(1, newReq.searchRequest.indices().size) + } +}