Skip to content

Commit

Permalink
Don't set optional proto fields
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Nov 20, 2024
1 parent e6cb66e commit ac1de89
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.grpc.services.search;

import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.Scroll;
Expand Down Expand Up @@ -83,12 +84,16 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] How many shard results to reduce on a node. Default is 512.
//optional int32 batched_reduce_size = 8;
searchReq.setBatchedReduceSize(proto.getBatchedReduceSize());
if (proto.hasBatchedReduceSize()) {
searchReq.setBatchedReduceSize(proto.getBatchedReduceSize());
}

//[optional] The time after which the search request will be canceled. Request-level parameter takes precedence over cancel_after_time_interval cluster setting. Default is -1.
//optional string cancel_after_time_interval = 9;
TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval()));
searchReq.setCancelAfterTimeInterval(cancelAfter);
if (proto.hasCancelAfterTimeInterval()) {
TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval()));
searchReq.setCancelAfterTimeInterval(cancelAfter);
}

//[optional] Whether to minimize round-trips between a node and remote clusters. Default is true.
//optional bool ccs_minimize_roundtrips = 10;
Expand Down Expand Up @@ -138,15 +143,19 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] Numbers of concurrent shard requests this request should execute on each node. Default is 5.
//optional int32 max_concurrent_shard_requests = 21;
searchReq.setMaxConcurrentShardRequests(proto.getMaxConcurrentShardRequests());
if (proto.hasMaxConcurrentShardRequests()) {
searchReq.setMaxConcurrentShardRequests(proto.getMaxConcurrentShardRequests());
}

//[optional] Whether to return phase-level took time values in the response. Default is false.
//optional bool phase_took = 22;
searchReq.setPhaseTook(proto.getPhaseTook());

//[optional] A prefilter size threshold that triggers a prefilter operation if the request exceeds the threshold. Default is 128 shards.
//optional int32 pre_filter_shard_size = 23;
searchReq.setPreFilterShardSize(proto.getPreFilterShardSize());
if (proto.hasPreFilterShardSize()) {
searchReq.setPreFilterShardSize(proto.getPreFilterShardSize());
}

//[optional] Specifies the shards or nodes on which OpenSearch should perform the search.
//optional string preference = 24;
Expand All @@ -160,7 +169,9 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] Specifies whether OpenSearch should use the request cache. Default is whether it's enabled in the index's settings.
//optional bool request_cache = 26;
searchReq.requestCache(proto.getRequestCache());
if (proto.hasRequestCache()) {
searchReq.requestCache(proto.getRequestCache());
}

//[optional] Indicates whether to return hits.total as an integer. Returns an object otherwise. Default is false.
//optional bool rest_total_hits_as_int = 27;
Expand All @@ -172,20 +183,29 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] Period to keep the search context open.
//optional string scroll = 29;
Scroll scroll = new Scroll(parseTimeValue(proto.getScroll(), null, "scroll"));
searchReq.scroll(scroll);
if (proto.hasScroll()) {
Scroll scroll = new Scroll(parseTimeValue(proto.getScroll(), null, "scroll"));
searchReq.scroll(scroll);
}

//[optional] Customizable sequence of processing stages applied to search queries.
//optional string search_pipeline = 30;
searchReq.pipeline(proto.getSearchPipeline());
if (proto.hasSearchPipeline()) {
searchReq.pipeline(proto.getSearchPipeline());
}

//[optional] Whether OpenSearch should use global term and document frequencies when calculating relevance scores. Default is SEARCH_TYPE_QUERY_THEN_FETCH.
//optional SearchType search_type = 31;
String searchType = proto.getSearchType().name();
if ("query_and_fetch".equals(searchType) || "dfs_query_and_fetch".equals(searchType)) {
throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");
switch (proto.getSearchType()) {
case SEARCH_TYPE_QUERY_THEN_FETCH:
searchReq.searchType(SearchType.QUERY_THEN_FETCH);
break;
case SEARCH_TYPE_DFS_QUERY_THEN_FETCH:
searchReq.searchType(SearchType.DFS_QUERY_THEN_FETCH);
break;
default:
searchReq.searchType(SearchType.DEFAULT);
}
searchReq.searchType(searchType);

//[optional] Whether to return sequence number and primary term of the last operation of each document hit.
//optional bool seq_no_primary_term = 32;
Expand All @@ -195,8 +215,8 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto(

//[optional] Number of results to include in the response.
//optional int32 size = 33;
if (proto.hasSize()) { // TODO
throw new UnsupportedOperationException("opensearch.proto.SearchRequest <size> not supported");
if (proto.hasSize()) {
searchReq.source().size(proto.getSize());
}

//[optional] A list of <field> : <direction> pairs to sort by.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.core.action.ActionListener;

import opensearch.proto.services.SearchServiceGrpc;
import org.opensearch.wlm.QueryGroupTask;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;

import static org.opensearch.grpc.services.search.SearchRequestProtoHelper.searchRequestFromProto;
import static org.opensearch.grpc.services.search.SearchRequestProtoHelper.searchResponseToProto;
Expand Down Expand Up @@ -59,6 +61,23 @@ public void onFailure(Exception e) {
public void search(opensearch.proto.SearchRequest searchRequestProto, StreamObserver<opensearch.proto.SearchResponse> responseObserver) {
org.opensearch.action.search.SearchRequest searchReq = searchRequestFromProto(searchRequestProto);
SearchRequestActionListener listener = new SearchRequestActionListener(responseObserver);

/*
When we execute a TransportSearchAction a SearchRequestOperationsListener is registered for the action to track resource usage for the request QueryGroup.
The QueryGroupTask.QUERY_GROUP_ID_HEADER uniquely identifies each QueryGroup
Typically the
*/

// WorkloadManagementTransportInterceptor
// if (isSearchWorkloadRequest(task)) {
// ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
// final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId();
// queryGroupService.rejectIfNeeded(queryGroupId);
// }

//
// client.threadPool().getThreadContext().putHeader(name, String.join(",", distinctHeaderValues));

client.execute(SearchAction.INSTANCE, searchReq, listener);
}
}
1 change: 1 addition & 0 deletions server/src/main/proto/spec/pub
Submodule pub added at ad83bf

0 comments on commit ac1de89

Please sign in to comment.