Skip to content

Commit

Permalink
Add primary first calls for different monitor types (#1205)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Agrawal <[email protected]>
  • Loading branch information
lezzago authored Oct 2, 2023
1 parent 3647e9c commit 2197d46
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
Expand Down Expand Up @@ -641,6 +642,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -673,7 +675,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -100,7 +101,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down Expand Up @@ -192,7 +195,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down

0 comments on commit 2197d46

Please sign in to comment.