-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrent Segment Search] support forceTermination for terminate_after parameter #8371
Comments
Some more background: With #8306 we will properly supporting the In the non-concurrent case today, if the OpenSearch/server/src/main/java/org/opensearch/search/query/QueryPhase.java Lines 225 to 231 in a11f98b
then an OpenSearch/server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollector.java Lines 76 to 83 in 1118dcf
In the concurrent search case, we will not throw the OpenSearch/server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollectorManager.java Lines 39 to 42 in 1118dcf
Which we can think of as a "soft" termination. |
Some potential solutions 1. Synchronize count across threadsWe can use an OpenSearch/server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollector.java Lines 85 to 97 in 1118dcf
We would need to do some benchmarking to see how this impacts performance, but intuitively this seems like not-insignificant overhead being introduced. 2. Keep "soft" termination behavior that exists todayContinue to throw 3. Change (2) to do a "hard" termination.Once one collector hits 4. Disallow
|
I agree with your analysis here. Any idea what the synchronization overhead might be? I think option 4 is okay because it can be temporary and There is potentially an option 1a where we don't keep an exact count (perhaps by updating the synchronized count every 10 or 100 or 1000 documents or whatever) in order to reduce the synchronization burden. |
The additional synchronization has to be avoided at all costs - it kills the whole concurrent part of the search, however Apache Lucene does that using fe |
It looks like I agree that synchronizing the doc count for each doc does kind of negate the concurrency part, but that would only be for the case where the That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary. |
I think it makes sense, once we see concurrent search being used widely (hopefully), we could get back to this issue |
The potential problem with solution 2 is that we would be codifying a behavior for |
To be fair, I don't think there are any risks here, right now we just do more work BUT we soft-limit the results. The outcome of the "hard" (to be implemented in the future) and "soft" flows should be exactly the same from the user facing perspective. |
@reta @jed326 There may be some change from user perspective. For example: if the The other cases, where a single collector hits the threshold is probably where user facing behavior will not change but in backend more work will be done and query may take longer to complete compared to sequential path. We can improve that later by using one of the option above but probably we will need to atleast document it that with concurrent path |
@sohami Good callout! I do think that if we are documenting that In my opinion if we say that On the other hand, just looking at the code alone the collectors aren't terminating early in that case, so it's also confusing from the developer perspective to add a check and change the flag to terminatedEarly is true in the reduce phase. In short I think we would be adding some unintuitive and confusing behaviors here in order to make it seem like the same query is terminating early in the concurrent vs non-conurrent search cases and in that case I would prefer to go with solution 4 and return some sort of validation error if |
I realized the same thing just after commenting Taking a step back, I think the |
Thanks @sohami and @jed326 , I agree with the conclusion. Also, we probably should keep in mind that the concurrent search would never produce the same end result sets (in general, but could look consistent sometimes) even for the same query when
Exactly, this is why I see no risks here (if implementation changes in future) - we will preserve the intent no matter what. |
@reta @sohami @andrross thanks for the discussion on this! It looks like our consensus has landed on solution 2, which won't require any changes right now. To recap solution 2 -- in each leaf slice after OpenSearch/server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollector.java Lines 85 to 96 in 1118dcf
This exception is swallowed in OpenSearch/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java Lines 314 to 324 in 31e67c1
I will take a note on opensearch-project/documentation-webstie#2662 for the behavior update to treating |
@reta Thanks, I missed the nuance that the results will be more or less the same, just that the system may do more work than is optimal. I agree that this can change in the future to be more efficient and is a backward compatible change as long as we preserve the intent. I'm on board with solution 2 as @jed326 has described. |
@reta it is not only about over fetching and doing more work in the backend but also returning more hits to the users. In cases like Aggs the collected count will be different. Given the results will vary with concurrent and non-concurrent path and Definitely we will also get the results from the benchmark which @jed326 is planning to run to see how much is the overhead. If the performance is too bad and shows poor performance as compared to non-concurrent flow then we can decide to disable concurrent flow for |
Hey @reta @sohami wanted to give an update on my investigation into 1. NPE when
|
// Since we cannot support early forced termination, we have to simulate it by | |
// artificially reducing the number of total hits and doc scores. | |
ScoreDoc[] scoreDocs = topDocs.scoreDocs; | |
if (terminatedAfter != null) { | |
if (totalHits.value > terminatedAfter) { | |
totalHits = new TotalHits(terminatedAfter, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); | |
} | |
if (scoreDocs != null && scoreDocs.length > terminatedAfter) { | |
scoreDocs = Arrays.copyOf(scoreDocs, terminatedAfter); | |
} | |
} |
There is an edge case this doesn’t cover though — if each slice has <
terminate_after
documents then we won’t go into this code block during reduce
. See example below:
curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1"
{
"took" : 3,
"timed_out" : false,
"terminated_early" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index-000001",
"_id" : "4yhLr4oBXBfDW6cmIi_l",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "kimchy"
}
}
},
{
"_index" : "my-index-000001",
"_id" : "5ChLr4oBXBfDW6cmSy8F",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2099-11-15T13:12:00",
"message" : "GET /search HTTP/1.1 200 1070000",
"user" : {
"id" : "dengjay"
}
}
}
]
}
}
There is actually an assertion in SearchPhaseController::getTotalHits
that is supposed to cover this scenario but given that we haven't seen this pop up with concurrent search test parameterization it means we're most likely missing coverage here.
OpenSearch/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java
Lines 785 to 788 in aca2e9d
} else if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_ACCURATE) { | |
assert totalHitsRelation == Relation.EQUAL_TO; | |
return new TotalHits(totalHits, totalHitsRelation); | |
} else { |
3. illegal_argument_exception
when size=0
and track_total_hits=true
are used for concurrent aggs. Details
curl -X GET "opens-clust-1gj8zaf4fng1b-d6fa7bd00441ed0d.elb.us-east-1.amazonaws.com/_search?track_total_hits=true&pretty" -H 'Content-Type: application/json' -d'
{
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
],
"type" : "search_phase_execution_exception",
"reason" : "all shards failed",
"phase" : "query",
"grouped" : true,
"failed_shards" : [
{
"shard" : 0,
"index" : "nyc_taxis",
"node" : "K7DzKxU4Tyin-01SQBj-7A",
"reason" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
}
],
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null",
"caused_by" : {
"type" : "illegal_argument_exception",
"reason" : "Collector managers should all be non-null"
}
}
},
"status" : 400
}
This is the same issue as [1] above, the exception is being handled by MultiCollectorManager
in Lucene here instead of failing as an NPE.
4. terminate_after
has no effect for concurrent aggs
Concurrent Search Enabled:
curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'
{
"size": 1,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"took" : 7345,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "gte"
},
"max_score" : 0.0,
"hits" : [
{
"_index" : "nyc_taxis",
"_id" : "47sxkYoBYW-wfCAOkuIQ",
"_score" : 0.0,
"_source" : {
"payment_type" : "1",
"rate_code_id" : "1",
"tip_amount" : 1.5,
"tolls_amount" : 0.0,
"extra" : 0.5,
"passenger_count" : 1,
"pickup_location" : [
-74.00631713867188,
40.733638763427734
],
"dropoff_datetime" : "2015-01-07 21:15:13",
"trip_distance" : 0.43,
"store_and_fwd_flag" : "N",
"total_amount" : 8.3,
"fare_amount" : 5.5,
"pickup_datetime" : "2015-01-07 21:08:53",
"dropoff_location" : [
-74.00151062011719,
40.73076248168945
],
"mta_tax" : 0.5,
"vendor_id" : "2",
"improvement_surcharge" : 0.3
}
}
]
},
"aggregations" : {
"distance_histo" : {
"buckets" : [
{
"key" : 0.0,
"doc_count" : 37826898,
"total_amount_stats" : {
"count" : 37826898,
"min" : -499.0,
"max" : 989970.39,
"avg" : 7.954326743102223,
"sum" : 3.0088750637E8
}
},
{
"key" : 1.0,
"doc_count" : 54261042,
"total_amount_stats" : {
"count" : 54261042,
"min" : -69.7,
"max" : 650262.85,
"avg" : 10.610401890365468,
"sum" : 5.7573146261E8
}
},
Concurrent Search Disabled
curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'
{
"size": 1,
"query": {
"bool": {
"filter": {
"range": {
"trip_distance": {
"lt": 50,
"gte": 0
}
}
}
}
},
"aggs": {
"distance_histo": {
"histogram": {
"field": "trip_distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}'
{
"took" : 2,
"timed_out" : false,
"terminated_early" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 0.0,
"hits" : [
{
"_index" : "nyc_taxis",
"_id" : "47sxkYoBYW-wfCAOkuIQ",
"_score" : 0.0,
"_source" : {
"payment_type" : "1",
"rate_code_id" : "1",
"tip_amount" : 1.5,
"tolls_amount" : 0.0,
"extra" : 0.5,
"passenger_count" : 1,
"pickup_location" : [
-74.00631713867188,
40.733638763427734
],
"dropoff_datetime" : "2015-01-07 21:15:13",
"trip_distance" : 0.43,
"store_and_fwd_flag" : "N",
"total_amount" : 8.3,
"fare_amount" : 5.5,
"pickup_datetime" : "2015-01-07 21:08:53",
"dropoff_location" : [
-74.00151062011719,
40.73076248168945
],
"mta_tax" : 0.5,
"vendor_id" : "2",
"improvement_surcharge" : 0.3
}
}
]
},
"aggregations" : {
"distance_histo" : {
"buckets" : [
{
"key" : 0.0,
"doc_count" : 1,
"total_amount_stats" : {
"count" : 1,
"min" : 8.3,
"max" : 8.3,
"avg" : 8.3,
"sum" : 8.3
}
}
]
}
}
}
terminate_after
depends on forceTermination being true to terminate Aggregators. This is because the MultiCollector
swallows the soft termination CollectionTerminatedException
and only rethrows it if all collectors have terminated, so during soft termination once the EarlyTerminatingCollector
terminates the Aggregator
will keep on collecting. See https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java#L214-L233.
Solutions:
[1],[3] are tracked in #10054
[2],[4] are both consequences of how terminate_after
's soft termination is implemented today and would be fixed by switching over to force termination.
So for the following reasons I think the best solution is force terminate concurrent search by default, if necessary provide an option for the user to soft terminate:
- The behavior from problem [2] as is makes the
track_total_hits=true
not always give aneq
relation which is a behavior change. We certainly could make the total hits still accurate in the soft termination case but in general I don't like the idea of fudging the hit count for it to be consistent. Moreover, this fix only applies to the TopDocs collector and all the doc counts for other collectors (ie Aggregators) would still be wrong. - As seen in problem [4] the way that soft termination exists today does not support early terminating aggs (regardless of if it's concurrent search or not).
- The
GlobalHitsThresholdChecker
used byTopScoreDocCollector
is already synchronizing the doc count across threads. Moreover, any contention will only happen when matching docs are found at the same time and even then the only part that is synchronized is the doc count check, the rest of the doc collection process can still be parallelized. - Keep our goal of maintaining the exact same search results for both concurrent and non concurrent search cases.
That being said, I will still follow up with the benchmarking results since I don't think any of the issues identified here will have an impact on result accuracy. From those results we can determine if we should disable concurrent search for terminate_after
workflow and/or if a soft termination option is necessary.
I have some benchmark numbers to discuss now, but before jumping into that I want to clarify some terminology we're using so that we're all on the same page. There are 2 things we need to look at here, which I will call force termination and soft/hard termination.
That being said, we can call the current behavior of All benchmarking was performed on the
Force Hard Termination changes can be found here: https://github.com/jed326/OpenSearch/tree/force-terminate_after First, let's look at a common use case where there is filtering in our query and we use With 50th Percentile Service Times in ms
In this case our index has around 160m docs and the range query filters it down to around 10m, which we then perform the aggregation on. With 50th Percentile Service Times in ms
On the other hand, we can also look at the worst case scenario - a query / agg that matches all the docs in the index 50th Percentile Service Times in ms:
From this we can see that as Full results: Main takeaways:
With the benchmarking data in mind, I see a few possible solutions:
I do not think we should default to Force Soft Termination because the behavior could be dramatically different between the concurernt and non-concurrent cases then. For example, we would collect up to My preference is solution 5 -- most immediately I think we can support Force Hard Termination as the default case and as follow-ups we can do the optimization to introduce a |
@jed326 Thanks for the detailed writeup and sharing the perf run numbers.
I agree Force Soft Termination seems to cause lot more issues as the behavior with
I agree with Option 5 with the exception of default being using |
Thanks @jed326 and @sohami , it is hard to disagree with the conclusions (based on the results). +1 to @sohami that the default path should be non-concurrent search (for
I am referring to this a few times probably, but I believe we could make the choice driven by search requests, for example something like that:
Using settings for thresholds (in my opinion) are very difficult to set right, even on per index basis: it either applies to all queries or none of them, basically we ask user to either make a guess or to profile her queries extensively. With per search request tuning the user could implement heuristic herself by running the same query in concurrent / non-concurrent fashion and returning the first results (ideally, that what we possibly could do but doing more work in background could impact cluster operations). |
@reta @sohami thanks! Defaulting to non-concurrent search case sounds good to me. For the settings part today we have cluster settings, index settings, and request parameters, each one taking higher priority and overriding the previous. Just like how for concurrent search enable/disable we have provided both cluster and index settings, I think it would make sense to introduce a cluster or index level setting like @sohami is suggesting and as a follow-up provide the request level options to override it. This would be similar to how the request cache index setting is overridden by the request cache request parameter. How does this sound to you @reta? |
@reta Picking right value for thresholds will definitely require some profiling of the workload to come up with right defaults. But in homogeneous workload case that should be doable. The intention of cluster setting (I don't think idx setting will make sense here) was to cover such homogenous cases where concurrent search could be used even with
Request level parameter will help in mix workloads where coming up with a default is not easy. But that will be the case for all the search request types (not only limited to |
Thanks @jed326
I am not against settings, but I would like to understand what guidance should be provided to the users for configuring them. Do you have a clear set of steps or procedure (we will have to provide the documentation anyway) on how non-expert users should pick the value for each of them? And since we are dealing with changing indices, how users could be sure those settings that they picked yesterday are still relevant today? (figuratively speaking) |
We will definitely provide documentation, but as @sohami mentioned above the main intention of a setting is to cover some broad spectrum of cases and give the user an additional control to revert back to non-concurrent search case if they see performance regressions in their workload. Broadly speaking, if the user is using |
Keeping this issue open for now since we still want to follow up to support concurrent search for |
Summarizing the remaining issues below: Based on the discussion above in #8371 (comment), we want to go forward with implementing a threshold setting above which we can use concurrent segment search for the Summary of the issues:
A POC that addresses many of these can be found here: 3bd8fe1 |
With the use of Are 2 & 3 caused by concurrent search? I would focus on behavior changes caused by concurrent search. |
Late to the party, but just wanted to quickly chime in.
So, this threshold is actually some kind of ratio, right? As the search space (the size of indexed data) grows, this threshold can grow, too? In general, if there are at least three segments to process, I think concurrent search will do better than non concurrent. Maybe even two. If |
I think we don't know yet for sure: there are many variables (number of segments, slices, , number documents, thread pool saturation etc), we probably need many more experiments to get insights here.
I think the decision which path to take is made way before and it also depends on sorting and index type (data streams), but again, more experiments could help.
We do I believe but not by size, specifically for data streams, depending on sort criteria, we could reverse the traversal. |
As a follow-up to #8306, we need to add support for forceTermination in the concurrent search case. Creating a new issue to track this as we need to track the number of docs across the threads and the implementation will require some discussion.
Describe the solution you'd like
forceTermination should be properly supported in concurrent segment search.
The text was updated successfully, but these errors were encountered: