diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java index ba8859625faa3..095e0cd8cf75b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * This interface provides a bridge between an aggregator and the optimization context, allowing @@ -31,22 +32,21 @@ */ public abstract class AggregatorBridge { - /** - * The optimization context associated with this aggregator bridge. - */ - FilterRewriteOptimizationContext filterRewriteOptimizationContext; - /** * The field type associated with this aggregator bridge. */ MappedFieldType fieldType; - void setOptimizationContext(FilterRewriteOptimizationContext context) { - this.filterRewriteOptimizationContext = context; + Consumer setRanges; + + void setRangesConsumer(Consumer setRanges) { + this.setRanges = setRanges; } /** * Checks whether the aggregator can be optimized. + *

+ * This method is supposed to be implemented in a specific aggregator to take in fields from there * * @return {@code true} if the aggregator can be optimized, {@code false} otherwise. * The result will be saved in the optimization context. @@ -57,6 +57,8 @@ void setOptimizationContext(FilterRewriteOptimizationContext context) { * Prepares the optimization at shard level after checking aggregator is optimizable. *

* For example, figure out what are the ranges from the aggregation to do the optimization later + *

+ * This method is supposed to be implemented in a specific aggregator to take in fields from there */ protected abstract void prepare() throws IOException; @@ -65,14 +67,18 @@ void setOptimizationContext(FilterRewriteOptimizationContext context) { * * @param leaf the leaf reader context for the segment */ - protected abstract void prepareFromSegment(LeafReaderContext leaf) throws IOException; + abstract Ranges prepareFromSegment(LeafReaderContext leaf) throws IOException; /** * Attempts to build aggregation results for a segment * * @param values the point values (index structure for numeric values) for a segment * @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket - * @param leafOrd + * @param ranges */ - protected abstract void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException; + abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Ranges ranges + ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java index ca3df11b3596c..22206d4dcb4bc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java @@ -32,6 +32,8 @@ */ public abstract class DateHistogramAggregatorBridge extends AggregatorBridge { + int maxRewriteFilters; + protected boolean canOptimize(ValuesSourceConfig config) { if (config.script() == null && config.missing() == null) { MappedFieldType fieldType = config.fieldType(); @@ -47,16 +49,17 @@ protected boolean canOptimize(ValuesSourceConfig config) { protected void buildRanges(SearchContext context) throws IOException { long[] bounds = Helper.getDateHistoAggBounds(context, fieldType.name()); - filterRewriteOptimizationContext.setRanges(buildRanges(bounds)); + this.maxRewriteFilters = context.maxAggRewriteFilters(); + setRanges.accept(buildRanges(bounds, maxRewriteFilters)); } @Override - protected void prepareFromSegment(LeafReaderContext leaf) throws IOException { + final Ranges prepareFromSegment(LeafReaderContext leaf) throws IOException { long[] bounds = Helper.getSegmentBounds(leaf, fieldType.name()); - filterRewriteOptimizationContext.setRangesFromSegment(leaf.ord, buildRanges(bounds)); + return buildRanges(bounds, maxRewriteFilters); } - private Ranges buildRanges(long[] bounds) { + private Ranges buildRanges(long[] bounds, int maxRewriteFilters) { bounds = processHardBounds(bounds); if (bounds == null) { return null; @@ -79,7 +82,7 @@ private Ranges buildRanges(long[] bounds) { getRoundingPrepared(), bounds[0], bounds[1], - filterRewriteOptimizationContext.maxAggRewriteFilters + maxRewriteFilters ); } @@ -123,11 +126,14 @@ protected int getSize() { } @Override - protected final void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException { + final FilterRewriteOptimizationContext.DebugInfo tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Ranges ranges + ) throws IOException { int size = getSize(); DateFieldMapper.DateFieldType fieldType = getFieldType(); - Ranges ranges = filterRewriteOptimizationContext.getRanges(leafOrd); BiConsumer incrementFunc = (activeIndex, docCount) -> { long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0); rangeStart = fieldType.convertNanosToMillis(rangeStart); @@ -135,9 +141,7 @@ protected final void tryOptimize(PointValues values, BiConsumer incr incrementDocCount.accept(bucketOrd, (long) docCount); }; - filterRewriteOptimizationContext.consumeDebugInfo( - multiRangesTraverse(values.getPointTree(), filterRewriteOptimizationContext.getRanges(leafOrd), incrementFunc, size) - ); + return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size); } private static long getBucketOrd(long bucketOrd) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java index 8811346b24b77..a1670035b7909 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java @@ -39,7 +39,6 @@ public final class FilterRewriteOptimizationContext { private boolean preparedAtShardLevel = false; private final AggregatorBridge aggregatorBridge; - int maxAggRewriteFilters; private String shardId; private Ranges ranges; @@ -72,8 +71,8 @@ private boolean canOptimize(final Object parent, final int subAggLength, SearchC boolean canOptimize = aggregatorBridge.canOptimize(); if (canOptimize) { - aggregatorBridge.setOptimizationContext(this); - this.maxAggRewriteFilters = context.maxAggRewriteFilters(); + aggregatorBridge.setRangesConsumer(this::setRanges); + this.shardId = context.indexShard().shardId().toString(); assert ranges == null : "Ranges should only be built once at shard level, but they are already built"; @@ -139,7 +138,7 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer add) { if (optimizedSegments > 0) { add.accept("optimized_segments", optimizedSegments); add.accept("unoptimized_segments", segments - optimizedSegments); - add.accept("leaf_node_visited", leafNodeVisited); - add.accept("inner_node_visited", innerNodeVisited); + add.accept("leaf_visited", leafNodeVisited); + add.accept("inner_visited", innerNodeVisited); } } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java index ab3e6e6b89a69..8496f7969d882 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java @@ -65,16 +65,20 @@ protected void buildRanges(RangeAggregator.Range[] ranges) { uppers[i] = upper; } - filterRewriteOptimizationContext.setRanges(new Ranges(lowers, uppers)); + setRanges.accept(new Ranges(lowers, uppers)); } @Override - public void prepareFromSegment(LeafReaderContext leaf) { + final Ranges prepareFromSegment(LeafReaderContext leaf) { throw new UnsupportedOperationException("Range aggregation should not build ranges at segment level"); } @Override - protected final void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException { + final FilterRewriteOptimizationContext.DebugInfo tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Ranges ranges + ) throws IOException { int size = Integer.MAX_VALUE; BiConsumer incrementFunc = (activeIndex, docCount) -> { @@ -82,9 +86,7 @@ protected final void tryOptimize(PointValues values, BiConsumer incr incrementDocCount.accept(bucketOrd, (long) docCount); }; - filterRewriteOptimizationContext.consumeDebugInfo( - multiRangesTraverse(values.getPointTree(), filterRewriteOptimizationContext.getRanges(leafOrd), incrementFunc, size) - ); + return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size); } /**