From 1937f5f06271cd70522c4d7391140b01070a16db Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Fri, 22 Nov 2024 13:29:39 -0800 Subject: [PATCH] Add SearchResponse proto fields for matchAll Signed-off-by: Finn Carroll --- .../action/search/SearchResponse.java | 8 + .../search/SearchRequestProtoHelper.java | 338 +++++++++++++++++- 2 files changed, 335 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 899c71e91e3ab..1293491194b9d 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -695,6 +695,14 @@ private PhaseTook(StreamInput in) throws IOException { this(in.readMap(StreamInput::readString, StreamInput::readLong)); } + /** + * @param phaseName search phase + * @return phase took in ms, -1 if no record + */ + public long getPhaseTook(SearchPhaseName phaseName) { + return phaseTookMap.getOrDefault(phaseName.getName(), -1L); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong); diff --git a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java index 21615812855fb..f9776c75c2a14 100644 --- a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java +++ b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java @@ -8,17 +8,33 @@ package org.opensearch.grpc.services.search; +import com.google.protobuf.ListValue; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import opensearch.proto.Hit; +import opensearch.proto.HitsMetadata; +import opensearch.proto.NullValue; +import opensearch.proto.PhaseTook; +import opensearch.proto.ResponseBody; import opensearch.proto.SearchResponse; +import opensearch.proto.ShardStatistics; +import opensearch.proto.TotalHits; +import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchType; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.Scroll; +import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.SearchContext; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import static opensearch.proto.SearchRequest.ExpandWildcard.EXPAND_WILDCARD_OPEN; import static org.opensearch.action.search.SearchRequest.DEFAULT_INDICES_OPTIONS; import static org.opensearch.common.unit.TimeValue.parseTimeValue; import static org.opensearch.grpc.services.search.SearchRequestBodyProtoHelper.searchSourceBuilderFromProto; @@ -27,9 +43,10 @@ public class SearchRequestProtoHelper { public static SearchRequest searchRequestFromProto(opensearch.proto.SearchRequest proto) { SearchRequest searchReq = new SearchRequest(); - if (searchReq.source() == null) { - searchReq.source(new SearchSourceBuilder()); - } + + //[optional] Search Request body + //optional SearchRequestBody request_body = 47; + searchReq.source(searchSourceBuilderFromProto(proto, proto.getRequestBody())); //[optional] Whether to include the _source field in the response. //optional SourceConfigParam source = 1; @@ -62,9 +79,13 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques [optional] Specifies the type of index that wildcard expressions can match. Supports list of values. Default is open. repeated ExpandWildcard expand_wildcards = 14; */ + EnumSet indicesoOptions = EnumSet.noneOf(IndicesOptions.Option.class); EnumSet wildcardStates = EnumSet.noneOf(IndicesOptions.WildcardStates.class); + // Add default options + indicesoOptions.add(IndicesOptions.Option.FORBID_CLOSED_INDICES); + if (!proto.hasAllowNoIndices()) { // add option by default indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES); } else if (proto.getAllowNoIndices()) { @@ -101,11 +122,18 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques } } + //DEBUG + wildcardStates.add(IndicesOptions.WildcardStates.OPEN); + searchReq.indicesOptions(new IndicesOptions(indicesoOptions, wildcardStates)); //[optional] Whether to return partial results if the request runs into an error or times out. Default is true. //optional bool allow_partial_search_results = 5; - searchReq.allowPartialSearchResults(proto.getAllowPartialSearchResults()); + if(proto.hasAllowPartialSearchResults()){ + searchReq.allowPartialSearchResults(proto.getAllowPartialSearchResults()); + } else { + searchReq.allowPartialSearchResults(true); + } //[optional] Whether the update operation should include wildcard and prefix queries in the analysis. Default is false. //optional bool analyze_wildcard = 6; @@ -123,6 +151,8 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques //optional int32 batched_reduce_size = 8; if (proto.hasBatchedReduceSize()) { searchReq.setBatchedReduceSize(proto.getBatchedReduceSize()); + } else { + searchReq.setBatchedReduceSize(512); } //[optional] The time after which the search request will be canceled. Request-level parameter takes precedence over cancel_after_time_interval cluster setting. Default is -1. @@ -130,6 +160,8 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques if (proto.hasCancelAfterTimeInterval()) { TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval())); searchReq.setCancelAfterTimeInterval(cancelAfter); + } else { + searchReq.setCancelAfterTimeInterval(new TimeValue(-1)); } //[optional] Whether to minimize round-trips between a node and remote clusters. Default is true. @@ -137,6 +169,8 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques searchReq.setCcsMinimizeRoundtrips(true); if (proto.hasCcsMinimizeRoundtrips()) { searchReq.setCcsMinimizeRoundtrips(proto.getCcsMinimizeRoundtrips()); + } else { + searchReq.setCcsMinimizeRoundtrips(true); } //[optional] Indicates whether the default operator for a string query should be AND or OR. Default is OR. @@ -185,6 +219,8 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques //optional int32 max_concurrent_shard_requests = 21; if (proto.hasMaxConcurrentShardRequests()) { searchReq.setMaxConcurrentShardRequests(proto.getMaxConcurrentShardRequests()); + } else { + searchReq.setMaxConcurrentShardRequests(5); } //[optional] Whether to return phase-level took time values in the response. Default is false. @@ -195,11 +231,15 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques //optional int32 pre_filter_shard_size = 23; if (proto.hasPreFilterShardSize()) { searchReq.setPreFilterShardSize(proto.getPreFilterShardSize()); + } else { + searchReq.setPreFilterShardSize(128); } //[optional] Specifies the shards or nodes on which OpenSearch should perform the search. //optional string preference = 24; - searchReq.preference(proto.getPreference()); + if (!proto.getPreference().isEmpty()) { + searchReq.preference(proto.getPreference()); + } //[optional] Query in the Lucene query string syntax using query parameter search. //optional string q = 25; @@ -219,7 +259,9 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques //[optional] Value used to route the update by query operation to a specific shard. //repeated string routing = 28; - searchReq.routing(proto.getRoutingList().toArray(new String[0])); + if (!proto.getRoutingList().isEmpty()) { + searchReq.routing(proto.getRoutingList().toArray(new String[0])); + } //[optional] Period to keep the search context open. //optional string scroll = 29; @@ -337,15 +379,289 @@ public static SearchRequest searchRequestFromProto(opensearch.proto.SearchReques throw new UnsupportedOperationException("opensearch.proto.SearchRequest not supported"); } - //[optional] Search Request body - //optional SearchRequestBody request_body = 47; - searchSourceBuilderFromProto(proto, proto.getRequestBody()); - return searchReq; } public static SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse response) { - return SearchResponse.newBuilder().build(); + SearchResponse.Builder searchRespProto = SearchResponse.newBuilder(); + ResponseBody.Builder respBodyProto = ResponseBody.newBuilder(); + + //[required] Milliseconds it took Elasticsearch to execute the request. + //optional int64 took = 1; + respBodyProto.setTook(response.getTook().getMillis()); + + //[required] If true, the request timed out before completion; returned results may be partial or empty. + //optional bool timed_out = 2; + respBodyProto.setTimedOut(response.isTimedOut()); + + //[required] Contains a count of shards used for the request. + //ShardStatistics shards = 3; + respBodyProto.setShards(getShardStats(response)); + + //[optional] Phase-level took time values in the response. + //optional PhaseTook phase_took = 4; + respBodyProto.setPhaseTook(getPhaseTook(response)); + + //[required] Contains returned documents and metadata. + //HitsMetadata hits = 5; + respBodyProto.setHits(getHitsMetadata(response)); + + //[optional] When you search one or more remote clusters, a `_clusters` section is included to provide information about the search on each cluster. + //optional ClusterStatistics clusters = 6; + + //[optional] Retrieved specific fields in the search response + //optional .google.protobuf.Struct fields = 7; + + //[optional] Highest returned document _score. + //optional float max_score = 8; + + //[optional] The number of times that the coordinating node aggregates results from batches of shard responses + //optional int32 num_reduce_phases = 9; + + //[optional] Contains profiling information. + //Profile profile = 10; + + //[optional] The PIT ID. + //optional string pit_id = 11; + + //[optional] Identifier for the search and its search context. + //optional string scroll_id = 12; + + //[optional] If the query was terminated early, the terminated_early flag will be set to true in the response + //optional bool terminated_early = 13; + + searchRespProto.setResponseBody(respBodyProto); + return searchRespProto.build(); + } + + private static ShardStatistics getShardStats(org.opensearch.action.search.SearchResponse response){ + ShardStatistics.Builder shardStats = ShardStatistics.newBuilder(); + + //[required] Number of shards that failed to execute the request. Note that shards that are not allocated will be considered neither successful nor failed. Having failed+successful less than total is thus an indication that some of the shards were not allocated. + //int32 failed = 1; + shardStats.setFailed(response.getFailedShards()); + + //[required] Number of shards that executed the request successfully. + //int32 successful = 2; + shardStats.setSuccessful(response.getSuccessfulShards()); + + //[required] Total number of shards that require querying, including unallocated shards. + //int32 total = 3; + shardStats.setTotal(response.getTotalShards()); + + //[optional] An array of any shard-specific failures that occurred during the search operation. + //repeated ShardFailure failures = 4; + if (response.getShardFailures().length > 0) { // TODO + throw new UnsupportedOperationException("ShardStatistics not supported"); + } + + //[optional] Number of shards that skipped the request because a lightweight check helped realize that no documents could possibly match on this shard. This typically happens when a search request includes a range filter and the shard only has values that fall outside of that range. + //optional int32 skipped = 5; + shardStats.setSkipped(response.getSkippedShards()); + + return shardStats.build(); + } + + private static PhaseTook getPhaseTook(org.opensearch.action.search.SearchResponse response) { + PhaseTook.Builder phaseTook = PhaseTook.newBuilder(); + + if(response.getPhaseTook() == null){ + return phaseTook.build(); + } + + //[required] Time taken in dfs_pre_query phase. + //int64 dfs_pre_query = 1; + phaseTook.setDfsPreQuery(response.getPhaseTook().getPhaseTook(SearchPhaseName.DFS_PRE_QUERY)); + + //[required] Time taken in query phase. + //int64 query = 2; + phaseTook.setQuery(response.getPhaseTook().getPhaseTook(SearchPhaseName.QUERY)); + + //[required] Time taken in fetch phase. + //int64 fetch = 3; + phaseTook.setFetch(response.getPhaseTook().getPhaseTook(SearchPhaseName.FETCH)); + + //[required] Time taken in dfs_query phase. + //int64 dfs_query = 4; + phaseTook.setDfsQuery(response.getPhaseTook().getPhaseTook(SearchPhaseName.DFS_QUERY)); + + //[required] Time taken in expand phase. + //int64 expand = 5; + phaseTook.setExpand(response.getPhaseTook().getPhaseTook(SearchPhaseName.EXPAND)); + + //[required] Time taken in can_match phase. + //int64 can_match = 6; + phaseTook.setCanMatch(response.getPhaseTook().getPhaseTook(SearchPhaseName.CAN_MATCH)); + + return phaseTook.build(); + } + + private static HitsMetadata getHitsMetadata(org.opensearch.action.search.SearchResponse response) { + HitsMetadata.Builder hitsMetaData = HitsMetadata.newBuilder(); + HitsMetadata.Total.Builder totalBuilder = HitsMetadata.Total.newBuilder(); + TotalHits.Builder totalHitsBuilder = TotalHits.newBuilder(); + + //[optional] Metadata about the number of matching documents. + //Total total = 1; + if (response.getHits() == null || response.getHits().getTotalHits() == null) { + throw new RuntimeException("SearchResponse hits are null"); + } + + if (response.getHits().getTotalHits().relation == org.apache.lucene.search.TotalHits.Relation.EQUAL_TO) { + totalHitsBuilder.setRelation(TotalHits.TotalHitsRelation.TOTAL_HITS_RELATION_EQ); + } else { + totalHitsBuilder.setRelation(TotalHits.TotalHitsRelation.TOTAL_HITS_RELATION_GTE); + } + + totalHitsBuilder.setValue(response.getHits().getTotalHits().value); + + //[required] Array of returned document objects. + //repeated Hit hits = 2; + hitsMetaData.addAllHits(getHits(response)); + + //[optional] Highest returned document _score. + //MaxScore max_score = 3; + HitsMetadata.MaxScore.Builder maxScore = HitsMetadata.MaxScore.newBuilder(); + if (Float.isNaN(response.getHits().getMaxScore())) { + maxScore.setNullValue(NullValue.NULL_VALUE_NULL); + } else { + maxScore.setDoubleValue(response.getHits().getMaxScore()); + } + + totalBuilder.setTotalHits(totalHitsBuilder); + hitsMetaData.setTotal(totalBuilder); + return hitsMetaData.build(); + } + + private static List getHits(org.opensearch.action.search.SearchResponse response) { + List hits = new ArrayList<>(); + + for (SearchHit hit : response.getHits()) { + Hit.Builder hitBuilder = Hit.newBuilder(); + + //[required] Name of the index containing the returned document. + //optional string type = 1; + hitBuilder.setType(hit.getIndex()); + + //[required] Name of the index containing the returned document. + //string index = 2; + hitBuilder.setType(hit.getIndex()); + + //[required] Unique identifier for the returned document. This ID is only unique within the returned index. + //string id = 3; + hitBuilder.setId(hit.getId()); + + //[optional] Relevance of the returned document. + //optional Score score = 4; + Hit.Score.Builder score = Hit.Score.newBuilder(); + if (Float.isNaN(hit.getScore())) { + score.setNullValue(NullValue.NULL_VALUE_NULL); + } else { + score.setDoubleValue(hit.getScore()); + } + hitBuilder.setScore(score); + + //[optional] Explanation of how the relevance score (_score) is calculated for every result. + //optional Explanation explanation = 5; + + //[optional] Contains field values for the documents. + //optional .google.protobuf.Struct fields = 6; + Value docFieldsVal = docFieldsToStruct(hit.getFields()); + if (!docFieldsVal.hasStructValue()) { + throw new IllegalArgumentException("Document fields do not parse to struct"); + } + hitBuilder.setFields(docFieldsVal.getStructValue()); + + //[optional] An additional highlight element for each search hit that includes the highlighted fields and the highlighted fragments. + //map highlight = 7; + + //[optional] An additional nested hits that caused a search hit to match in a different scope. + //map inner_hits = 8; + + //[optional] List of matched query names used in the search request. + //repeated string matched_queries = 9; + for (String matched : hit.getMatchedQueries()) { + hitBuilder.addMatchedQueries(matched); + } + + //[optional] Defines from what inner nested object this inner hit came from + //optional NestedIdentity nested = 10; + + //[optional] List of fields ignored. + //repeated string ignored = 11; + + //[optional] These values are retrieved from the document’s original JSON source and are raw so will not be formatted or treated in any way, unlike the successfully indexed fields which are returned in the fields section. + //map ignored_field_values = 12; + + //[optional] Shard from which this document was retrieved. + //optional string shard = 13; + if (hit.getShard() != null) { + hitBuilder.setShard(hit.getShard().getShardId().toString()); + } + + //[optional] Node from which this document was retrieved. + //optional string node = 14; + if (hit.getShard() != null) { + hitBuilder.setNode(hit.getShard().getNodeId()); + } + + //optional string routing = 15; + + //[optional] Source document. + //optional .google.protobuf.Struct source = 16; + Value sourceVal = docFieldsToStruct(hit.getSourceAsMap()); + if (!sourceVal.hasStructValue()) { + throw new IllegalArgumentException("Document fields do not parse to struct"); + } + hitBuilder.setFields(sourceVal.getStructValue()); + + //[optional] Counts the number of operations that happened on the index + //optional int64 seq_no = 17; + hitBuilder.setSeqNo(hit.getSeqNo()); + + //[optional] Counts the number of shard has changed. + //optional int64 primary_term = 18; + hitBuilder.setPrimaryTerm(hit.getPrimaryTerm()); + + //[optional] Version number of the document. + //optional int64 version = 19; + hitBuilder.setVersion(hit.getVersion()); + + //[optional] Sorted values + //repeated FieldValueResponse sort = 20; + + hits.add(hitBuilder.build()); + } + + return hits; + } + + private static com.google.protobuf.Value docFieldsToStruct(Object docFieldRoot) { + Value.Builder val = Value.newBuilder(); + + if (docFieldRoot instanceof Double) { + val.setNumberValue((double) docFieldRoot); + } else if (docFieldRoot instanceof String) { + val.setStringValue((String) docFieldRoot); + } else if (docFieldRoot instanceof Boolean) { + val.setBoolValue((Boolean) docFieldRoot); + } else if (docFieldRoot instanceof List) { + ListValue.Builder listBuilder = ListValue.newBuilder(); + for (Object listEntry : (List) docFieldRoot) { + listBuilder.addValues(docFieldsToStruct(listEntry)); + } + } else if (docFieldRoot instanceof Map) { + Struct.Builder structBuilder = Struct.newBuilder(); + Map fieldMap = (Map) docFieldRoot; + for (Map.Entry entry : fieldMap.entrySet()) { + structBuilder.putFields(entry.getKey(), docFieldsToStruct(entry.getValue())); + } + val.setStructValue(structBuilder); + } else { + throw new UnsupportedOperationException("Failed to parse document field [" + docFieldRoot.toString() + "]"); + } + + return val.build(); } // TODO: Refactor RestSearchAction::checkRestTotalHits to this