Skip to content

Commit

Permalink
Implement request side of match all query
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Nov 20, 2024
1 parent ac1de89 commit 853b206
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@

package org.opensearch.grpc.services.search;

import opensearch.proto.SearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.Scroll;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.internal.SearchContext;

import java.util.EnumSet;

import static org.opensearch.action.search.SearchRequest.DEFAULT_INDICES_OPTIONS;
import static org.opensearch.common.unit.TimeValue.parseTimeValue;
import static org.opensearch.grpc.services.search.SearchRequestBodyProtoHelper.searchSourceBuilderFromProto;

public class SearchRequestProtoHelper {

public static org.opensearch.action.search.SearchRequest searchRequestFromProto(opensearch.proto.SearchRequest proto) {
org.opensearch.action.search.SearchRequest searchReq = new org.opensearch.action.search.SearchRequest();
public static SearchRequest searchRequestFromProto(opensearch.proto.SearchRequest proto) {
SearchRequest searchReq = new SearchRequest();
if (searchReq.source() == null) {
searchReq.source(new SearchSourceBuilder());
}
Expand All @@ -49,22 +53,55 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(
[optional] Whether to ignore wildcards that don't match any indexes. Default is true.
optional bool allow_no_indices = 4;
[optional] Specifies the type of index that wildcard expressions can match. Supports list of values. Default is open.
repeated ExpandWildcard expand_wildcards = 14;
[optional] Specifies whether to include missing or closed indexes in the response and ignores unavailable shards during the search request. Default is false.
optional bool ignore_unavailable = 18;
[optional] Whether to ignore concrete, expanded, or indexes with aliases if indexes are frozen. Default is true.
optional bool ignore_throttled = 17;
[optional] Specifies the type of index that wildcard expressions can match. Supports list of values. Default is open.
repeated ExpandWildcard expand_wildcards = 14;
*/
IndicesOptions indicesOptions = IndicesOptions.fromParameters(
proto.getExpandWildcardsList(),
proto.getIgnoreUnavailable(),
proto.getAllowNoIndices(),
proto.getIgnoreThrottled(),
DEFAULT_INDICES_OPTIONS);
searchReq.indicesOptions(indicesOptions);
EnumSet<IndicesOptions.Option> indicesoOptions = EnumSet.noneOf(IndicesOptions.Option.class);
EnumSet<IndicesOptions.WildcardStates> wildcardStates = EnumSet.noneOf(IndicesOptions.WildcardStates.class);

if (!proto.hasAllowNoIndices()) { // add option by default
indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES);
} else if (proto.getAllowNoIndices()) {
indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES);
}

if (proto.getIgnoreUnavailable()) {
indicesoOptions.add(IndicesOptions.Option.IGNORE_UNAVAILABLE);
}

if (!proto.hasIgnoreThrottled()) { // add option by default
indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED);
} else if (proto.getIgnoreThrottled()) {
indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED);
}

for (opensearch.proto.SearchRequest.ExpandWildcard wc : proto.getExpandWildcardsList()) {
switch (wc) {
case EXPAND_WILDCARD_OPEN:
wildcardStates.add(IndicesOptions.WildcardStates.OPEN);
break;
case EXPAND_WILDCARD_CLOSED:
wildcardStates.add(IndicesOptions.WildcardStates.CLOSED);
break;
case EXPAND_WILDCARD_HIDDEN:
wildcardStates.add(IndicesOptions.WildcardStates.HIDDEN);
break;
case EXPAND_WILDCARD_NONE:
wildcardStates.clear();
break;
case EXPAND_WILDCARD_ALL:
wildcardStates.addAll(EnumSet.allOf(IndicesOptions.WildcardStates.class));
break;
}
}

searchReq.indicesOptions(new IndicesOptions(indicesoOptions, wildcardStates));

//[optional] Whether to return partial results if the request runs into an error or times out. Default is true.
//optional bool allow_partial_search_results = 5;
Expand Down Expand Up @@ -97,7 +134,10 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] Whether to minimize round-trips between a node and remote clusters. Default is true.
//optional bool ccs_minimize_roundtrips = 10;
searchReq.setCcsMinimizeRoundtrips(proto.getCcsMinimizeRoundtrips());
searchReq.setCcsMinimizeRoundtrips(true);
if (proto.hasCcsMinimizeRoundtrips()) {
searchReq.setCcsMinimizeRoundtrips(proto.getCcsMinimizeRoundtrips());
}

//[optional] Indicates whether the default operator for a string query should be AND or OR. Default is OR.
//optional Operator default_operator = 11;
Expand Down Expand Up @@ -304,12 +344,12 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(
return searchReq;
}

public static opensearch.proto.SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse response) {
return opensearch.proto.SearchResponse.newBuilder().build();
public static SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse response) {
return SearchResponse.newBuilder().build();
}

// TODO: Refactor RestSearchAction::checkRestTotalHits to this
private static void checkRestTotalHits(boolean totalHitsAsInt, org.opensearch.action.search.SearchRequest searchRequest) {
private static void checkRestTotalHits(boolean totalHitsAsInt, SearchRequest searchRequest) {
if (totalHitsAsInt == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public void onResponse(org.opensearch.action.search.SearchResponse response) {

@Override
public void onFailure(Exception e) {
// DEBUG PRINT
System.out.println(e.getMessage());

respObserver.onError(
new RuntimeException("SearchRequest task failed", e)
);
Expand All @@ -61,23 +64,6 @@ public void onFailure(Exception e) {
public void search(opensearch.proto.SearchRequest searchRequestProto, StreamObserver<opensearch.proto.SearchResponse> responseObserver) {
org.opensearch.action.search.SearchRequest searchReq = searchRequestFromProto(searchRequestProto);
SearchRequestActionListener listener = new SearchRequestActionListener(responseObserver);

/*
When we execute a TransportSearchAction a SearchRequestOperationsListener is registered for the action to track resource usage for the request QueryGroup.
The QueryGroupTask.QUERY_GROUP_ID_HEADER uniquely identifies each QueryGroup
Typically the
*/

// WorkloadManagementTransportInterceptor
// if (isSearchWorkloadRequest(task)) {
// ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
// final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId();
// queryGroupService.rejectIfNeeded(queryGroupId);
// }

//
// client.threadPool().getThreadContext().putHeader(name, String.join(",", distinctHeaderValues));

client.execute(SearchAction.INSTANCE, searchReq, listener);
}
}

0 comments on commit 853b206

Please sign in to comment.