Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree #16674

Merged
merged 23 commits into from
Jan 27, 2025

Conversation

sandeshkr419
Copy link
Contributor

@sandeshkr419 sandeshkr419 commented Nov 18, 2024

Description

Sharing the updated code design to resolve date histograms with metric sub-aggregators:

  1. Identify if the query can be resolved via star-tree and initialize StarTreeQueryContext in search context.
  2. Start pre-computation in parent aggregator class when getLeafCollector() in invoked, if query can be resolved using star-tree.
  3. Read up star-tree values.
  4. Filter the star-tree values based on dimensions & query, store them in FixedBitSet matchingDocsBitSet
  5. Initialize StarTreeBucketCollector for all the aggregators including sub-aggregators.
    StarTreeBucketCollector interface exposes a method collectStarTreeEntry() which we run through all the StarTreeBucketCollector corresponding to all the aggregators. Also, it is used to initialize the relevant metric iterators which we require for each aggregators to pre-compute that particular aggregation.
  6. Execute collectStarTreeEntry() for all StarTreeBucketCollector to collect up all buckets.
  7. For all the sub-aggregators other than the parent aggregator, return an early termination leaf collector when getLeafCollector() is invoked. For parent aggregator, getLeafCollector() pre-computes the aggregation using above mentioned steps and then returns an early termination leaf collector.

This is the draft set of changes which I am utilizing to discuss solution approach here.

The primary challenge to resolve a date histogram with metric aggregation was to figure out how sub-aggregators will get resolve. When resolving a query by star-tree, we lose lthe need of ucene documents and don't utilize collect() calls which are used internally to delegate the collection of sub-aggregations to sub-collectors.

To mitigate this challenge, I have introduced a wrapper class - StarTreeBucketCollector to basically introduce a collectStarEntry(int starTreeEntry, long bucket) method. This method is then overridden in metric aggregator methods and invoked from parent aggregators (here DateHistogramAggregator).

The benefit of this strategy is that this is easily extensible by other bucket aggregators where metric aggregations will be nested. Also, other bucket related utilities are re-useable as it is, it saves the effort of having a separate set of utilities for star tree buckets as the old buckets are utilized here.

Want to take early feedback on this approach.

Note: Things are hard-coded for one example query shape right now

{
    "size": 0,
    "aggs": {
        "by_hour": {
            "date_histogram": {
                "field": "@timestamp",
                "calendar_interval": "month"
            }, "aggs": {
                "sum_status": {
                    "sum": {
                        "field": "status"
                    }
                }
            }
        }
    }
}

{
    "took": 15785,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1000,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "by_hour": {
            "buckets": [
                {
                    "key_as_string": "2024-01-01T00:00:00.000Z",
                    "key": 1704067200000,
                    "doc_count": 208,
                    "sum_status": {
                        "value": 43527.0
                    }
                },
                {
                    "key_as_string": "2024-02-01T00:00:00.000Z",
                    "key": 1706745600000,
                    "doc_count": 783,
                    "sum_status": {
                        "value": 164741.0
                    }
                },
                {
                    "key_as_string": "2024-03-01T00:00:00.000Z",
                    "key": 1709251200000,
                    "doc_count": 9,
                    "sum_status": {
                        "value": 1894.0
                    }
                }
            ]
        }
    }
}

Related Issues

Resolves (#16552)

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Search:Aggregations labels Nov 18, 2024
Copy link
Contributor

❌ Gradle check result for d7fbc39: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Collaborator

@msfroh msfroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I don't like the deepening association with the Collector interface, given that we're not collecting documents from a Lucene query.

Instead, could we try to work at the Aggregator level more?

I think the place where you should hook more of the star tree stuff in would be the getLeafCollector(LeafReaderContext) method in AggregatorBase:

public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {

I think that's where you could (per-segment) make your choice about whether you are going to delegate to the aggregation logic to return a real LeafBucketCollector, or you're going to throw a CollectionTerminatedException. You can even pass the subAggregators to that logic so the parent has easy access to its children.

Essentially, in the star tree case, you should never need to return a collector. You can just read the values directly from the segment.

@bharath-techie
Copy link
Contributor

Essentially, in the star tree case, you should never need to return a collector. You can just read the values directly from the segment.

Hi @msfroh @sandeshkr419 ,
Do we need a different interface / method specific for star tree which will be returned by the aggregators in that case ?

Currently, lets say we have nested aggregation as follows :

{
     "date_histogram_aggs" : {
                "terms_aggs" : {
                          "sum_aggs"
                }
      }
}

Totally makes sense to throw CollectionTerminationException from date_histogram_aggs [ if we don't throw then I noticed that if size > 0, we get into multiCollector. collectDoc() and similar other issues where we expect a traditional doc collector ]

It works well if there are no nested collectors.

But am just wondering how to handle nested aggs , for the above example we need collectDoc equivalent in terms_aggs and sum_aggs for star tree entries.

Do we need to return a new interface/class specific for star tree in that case for aggregation ?

For Star tree aggregations :
REPLACE

    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException 

WITH 

    public StarTreeAggregator getStarAggregator(LeafReaderContext ctx, StarTreeAggregator sub) throws IOException 

// And all aggregators which supports star tree will have something like 

return StarTreeAggregator {
        @override
        void aggregateStarTreeEntry(int starTreeEntryId, int bucket) {
                .....actual logic.....         
        }
}


Because we can't throw CollectionTerminationException in subAggregators as we need the collection to happen in the aggregation hierarchy order.

And if we don't have new interface/class, then still subCollector is of type LeafBucketCollector where we had to implement some sort of collect specific to star-tree.

Please correct me if my understanding is wrong.

@sandeshkr419
Copy link
Contributor Author

@msfroh @bharath-techie Refactored the changes to not extend LeafBucketCollectors. However, I have introduced a new interface StarTreeCollector to expose a preCompute() method.

Now, introducing this new method can compliment with Froh's idea on unifying pre-computations, I'll leave comments on that PR itself to make it more friendly while pre-computing sub-aggregations.

Copy link
Contributor

github-actions bot commented Dec 3, 2024

❌ Gradle check result for e1898f1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bharath-techie
Copy link
Contributor

@sandeshkr419
My only concern is that without the starTreeEntry, bucket pair , how nested aggs will get handled.

Can we please validate dateHisto -> NumericTerms -> Sum chain just to validate if the proposal holds good.

@sandeshkr419
Copy link
Contributor Author

@msfroh @bharath-techie

Comparing the 2 solutions proposed in 2 commits:

  1. (1st commit) Having an extension to LeafCollector (lets call it StarTreeCollector for now) with a side-kick method collectStarTreeEntry - so basically we filter out the star-tree entries in a bitset. and each aggregator initializes a StarTreeCollector and the collector/sub-collector just keeps iterating on dimensions/metrics to collect the buckets of that aggregator/sub-aggregator.
  2. (Pre-compute commit) Having a pre-compute method outside LeafCollector code which iterates over the dimension/metrics of one iterator and then moves to sub-aggregator. The challenge here is that the sub-aggregator does not knows which parent bucket the sub-bucket will have to reside in unless you iterate again with dimension values which are already iterated over in parent aggregator. For the cases like date-histogram with metric aggregator, it is not a problem - you collect doc-count for parent aggregator (date buckets), and then go to sum/metric aggregator, you need to iterate over date buckets again to know which buckets to update sum. The challenge comes when the nesting deepens.
    For example,
{
    "size": 0,
    "aggs": {
        "unique_terms": {
            "date_histogram": {
                "field": "@timestamp",
                "calendar_interval": "month"
            },
            "aggs": {
                "terms_field": {
                    "terms": {
                        "field": "status",
                        "size": 10
                    },
                    "aggs": {
                        "sum_s": {
                            "sum": {
                                "field": "status"
                            }
                        }
                    }
                }
            }
        }
    }
}

Response (redacted):

"aggregations": {
        "unique_terms": {
            "buckets": [
                {
                    "key_as_string": "2024-01-01T00:00:00.000Z",
                    "key": 1704067200000,
                    "doc_count": 208,
                    "terms_field": {
                        "doc_count_error_upper_bound": 0,
                        "sum_other_doc_count": 82,
                        "buckets": [
                            {
                                "key": 209,
                                "doc_count": 16,
                                "sum_s": {
                                    "value": 3344.0
                                }
                            },
                            {
                                "key": 202,
                                "doc_count": 14,
                                "sum_s": {
                                    "value": 2828.0
                                }
                            },
                            {
                                "key": 216,
                                "doc_count": 14,
                                "sum_s": {
                                    "value": 3024.0
                                }
                            }

Now to update the sum bucket, we'd have to iterate over date dimension & field dimension again along with sum metric. This complicates the extensibility part of solution 2 as each nesting increment would require us maintaining multiple star-tree entry iterators. However, the initial solution 1 helps avoid keeping track of multiple iterators with each nesting. The only catch is extending LeafCollector and calling it a StarTreePreComputeCollector or something similar.

Basically, a collector like interface which has hold of sub-collectors (and we recursively call up collectStarTreeEntry - which will be an analogous method to collect/collectBucjket what we have for Lucene documents) might simplify the solution what @bharath-techie also proposed earlier.

Let me know your thoughts on this.

@msfroh
Copy link
Collaborator

msfroh commented Dec 18, 2024

Now to update the sum bucket, we'd have to iterate over date dimension & field dimension again along with sum metric. This complicates the extensibility part of solution 2 as each nesting increment would require us maintaining multiple star-tree entry iterators. However, the initial solution 1 helps avoid keeping track of multiple iterators with each nesting. The only catch is extending LeafCollector and calling it a StarTreePreComputeCollector or something similar.

I don't understand what benefit you get from extending LeafCollector to do this. LeafCollector is a simple interface with (effectively) one method (collect(int)). What's stopping you from creating your own class that manages the star tree iterators?

@bharath-techie
Copy link
Contributor

bharath-techie commented Dec 19, 2024

Hi @sandeshkr419 ,
The separate interface which we have must have an implementation similar to LeafCollector.
We don't have to extend LeafCollector.

public interface StarTreeAggregator {
    // public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException;

    public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException;
}

to

public interface StarTreeAggregator {

    public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, StarTreeAggregator sub) throws IOException;

// collect equivalent method
  public void aggregate(int doc, long owningBucketOrd) throws IOException;
}

Will this work ?

Basically the parent must check if all subAggregators are of type StarTreeAggregator and then if that's the case call aggregate method similar to the current leafCollectors.

So all aggregators will have preCompute and aggregate implementations. We will call preCompute for the first level of parent aggregators which calls aggregate on all the subAggregators.

[ We can have better interfaces as well but the idea is to enable this within our interfaces and make it work similar to existing leafCollector interface ]

Edit : [ 25/12 ]
After further discussing with Sandesh, I think the bucket aggregators can implement and return StarTreeAggregator similar to LeafBucketCollector. Instead of scorer calling 'collector.collect' - here we can probably check if a query shape can be solved by startree , then precompute method can probably call the aggregate for each starTreeEntry.

public class DateHistogramAggregator implements StarTreeAggregator
...
public StarTreeAggregator getStarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
return new StarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
@override
  public void aggregate(int starTreeEntry, long owningBucketOrd) throws IOException {
    ...
    sub.aggregate(starTreeEntry, owningBucketOrd);
   ... 
}
}

public interface StarTreeAggregator {
   void aggregate(int starTreeEntry, long owningBucketOrd);
}

BucketsAggregator {
    public StarTreeAggregator getStarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
       // default no impl
   }

   precompute() {
        if( can be solved in star tree ) {
            StarTreeAggregator st = getStarTreeAggregator(ctx, sub);
            for(int entry : startreeentries) {
                 st.aggregate(entry, 0);
           }
       }
   }
}

Copy link
Contributor

github-actions bot commented Jan 5, 2025

❌ Gradle check result for 75ad9a8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Jan 6, 2025

❌ Gradle check result for 7fab6ad: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@sandeshkr419
Copy link
Contributor Author

@msfroh @bharath-techie Updated the PR description with high-level code design.

Basically, I introduced these 2 interfaces (bad terminology I know - I'll revisit the terminology once again):

  1. StarTreeBucketCollector - this is used to encapsulate the iterators & collectStarTreeEntry() I will require for a particular aggregation. The preCompute utility from the parent aggregator can then call directly its collectStarTreeEntry().
  2. StarTreePreComputeCollector - basically I needed a utility to call getStarTreeBucketCollector() for classes supporting star-tree, so created this interface so that my aggregator classes could implement this.

As next steps, 1/ I'll remove up the major hard-coding in the code regarding identifying supported query shapes and fix other POC code 2/ tune-up the above interfaces to get to a minimal state. Other than this, @bharath-techie , @msfroh I think this design achieves what it intends to do in the 1st place - 1/ easily extensible for other bucket aggregators and introducing more nested aggregations and 2/ remove the unwanted dependency from LeafBucketCollector as it is not required. Let me know if there are major design comments, I'll work on getting this PR out of draft state in parallel.

Copy link
Contributor

❌ Gradle check result for 7208b67: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

✅ Gradle check result for a819dfe: SUCCESS

@sandeshkr419
Copy link
Contributor Author

@msfroh Thanks for checking in. Added more test cases and added changelog. Let me know if you have further comments.

@bharath-techie Please let me know if you also have any other comments.

@bharath-techie
Copy link
Contributor

Thanks for the changes @sandeshkr419.
I was able to test the changes with http logs benchmarks - date histo aggs with / without terms query , and with metric aggs [sum,avg,value_count] there were no correctness issues.

So the changes look good and will approve it post the addressal of minor comments.

Copy link
Contributor

❌ Gradle check result for 92415c4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

✅ Gradle check result for c54f86d: SUCCESS

Signed-off-by: Sandesh Kumar <[email protected]>
Copy link
Contributor

❌ Gradle check result for 61644ca: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 61644ca: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❕ Gradle check result for 61644ca: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@msfroh msfroh merged commit b5234a5 into opensearch-project:main Jan 27, 2025
71 of 84 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Jan 27, 2025
…sing star-tree (#16674)

---------

Signed-off-by: Sandesh Kumar <[email protected]>
Co-authored-by: Sandesh Kumar <[email protected]>
(cherry picked from commit b5234a5)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@sandeshkr419 sandeshkr419 deleted the dh-temp branch January 27, 2025 21:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x Backport to 2.x branch enhancement Enhancement or improvement to existing feature or request Search:Aggregations
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants