From 7c491b94f57a143584d8f25fedb94d469e185348 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 20 Jun 2024 11:58:44 -0700 Subject: [PATCH] Refactor extract segment match all logic Signed-off-by: bowenlan-amzn --- .../bucket/composite/CompositeAggregator.java | 7 ++- .../AutoDateHistogramAggregator.java | 7 ++- .../histogram/DateHistogramAggregator.java | 7 ++- .../bucket/range/RangeAggregator.java | 2 +- ...tractDateHistogramAggAggregatorBridge.java | 7 +++ .../optimization/ranges/AggregatorBridge.java | 4 ++ .../ranges/OptimizationContext.java | 45 +++++++++---------- 7 files changed, 50 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index e2c31a3902896..b89154b927629 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -231,6 +231,11 @@ protected int getSize() { protected Function bucketOrdProducer() { return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key)); } + + @Override + protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException { + return segmentMatchAll(context, leaf); + } } @Override @@ -565,7 +570,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount, context); + boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount); if (optimized) throw new CollectionTerminatedException(); finishLeaf(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index fab5e6e3526f4..d09ac90e8ae01 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -210,6 +210,11 @@ protected Prepared getRoundingPrepared() { protected Function bucketOrdProducer() { return (key) -> getBucketOrds().add(0, preparedRounding.round((long) key)); } + + @Override + protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException { + return segmentMatchAll(context, leaf); + } } protected abstract LongKeyedBucketOrds getBucketOrds(); @@ -241,7 +246,7 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount, context); + boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount); if (optimized) throw new CollectionTerminatedException(); final SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 570320b1347f5..a4da778c59271 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -155,6 +155,11 @@ protected long[] processHardBounds(long[] bounds) { protected Function bucketOrdProducer() { return (key) -> bucketOrds.add(0, preparedRounding.round((long) key)); } + + @Override + protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException { + return segmentMatchAll(context, leaf); + } } @Override @@ -171,7 +176,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount, context); + boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java index 15e66394fe6c1..dad99e8e63a46 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregator.java @@ -316,7 +316,7 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { - boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount, context); + boolean optimized = optimizationContext.tryFastFilterAggregation(ctx, this::incrementBucketDocCount); if (optimized) throw new CollectionTerminatedException(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/optimization/ranges/AbstractDateHistogramAggAggregatorBridge.java b/server/src/main/java/org/opensearch/search/optimization/ranges/AbstractDateHistogramAggAggregatorBridge.java index 8662e573e42ab..85967e6ab153c 100644 --- a/server/src/main/java/org/opensearch/search/optimization/ranges/AbstractDateHistogramAggAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/optimization/ranges/AbstractDateHistogramAggAggregatorBridge.java @@ -11,6 +11,8 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.opensearch.common.Rounding; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.MappedFieldType; @@ -154,4 +156,9 @@ private static long getBucketOrd(long bucketOrd) { return bucketOrd; } + + protected boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.query().rewrite(ctx.searcher()).createWeight(ctx.searcher(), ScoreMode.COMPLETE_NO_SCORES, 1f); + return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + } } diff --git a/server/src/main/java/org/opensearch/search/optimization/ranges/AggregatorBridge.java b/server/src/main/java/org/opensearch/search/optimization/ranges/AggregatorBridge.java index a270f5c5b1741..a3d42de2ebb12 100644 --- a/server/src/main/java/org/opensearch/search/optimization/ranges/AggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/optimization/ranges/AggregatorBridge.java @@ -49,4 +49,8 @@ void setOptimizationContext(OptimizationContext optimizationContext) { abstract void tryFastFilterAggregation(PointValues values, BiConsumer incrementDocCount, Ranges ranges) throws IOException; protected abstract Function bucketOrdProducer(); + + protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException { + return false; + } } diff --git a/server/src/main/java/org/opensearch/search/optimization/ranges/OptimizationContext.java b/server/src/main/java/org/opensearch/search/optimization/ranges/OptimizationContext.java index d04b53bf35c8f..9e33f4325b34d 100644 --- a/server/src/main/java/org/opensearch/search/optimization/ranges/OptimizationContext.java +++ b/server/src/main/java/org/opensearch/search/optimization/ranges/OptimizationContext.java @@ -16,8 +16,6 @@ import org.apache.lucene.index.PointValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.ArrayUtil; import org.opensearch.common.CheckedRunnable; import org.opensearch.index.mapper.DocCountFieldMapper; @@ -65,13 +63,13 @@ public boolean canOptimize(final Object parent, final int subAggLength, SearchCo if (parent != null || subAggLength != 0) return false; boolean rewriteable = aggregatorBridge.canOptimize(); - logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); this.rewriteable = rewriteable; if (rewriteable) { aggregatorBridge.setOptimizationContext(this); this.maxAggRewriteFilters = context.maxAggRewriteFilters(); this.shardId = context.indexShard().shardId().toString(); } + logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, shardId); return rewriteable; } @@ -98,11 +96,8 @@ void setRanges(Ranges ranges) { * * @param incrementDocCount consume the doc_count results for certain ordinal */ - public boolean tryFastFilterAggregation( - final LeafReaderContext leafCtx, - final BiConsumer incrementDocCount, - SearchContext context - ) throws IOException { + public boolean tryFastFilterAggregation(final LeafReaderContext leafCtx, final BiConsumer incrementDocCount) + throws IOException { segments++; if (!rewriteable) { return false; @@ -125,20 +120,8 @@ public boolean tryFastFilterAggregation( return false; } - // even if no ranges built at shard level, we can still perform the optimization - // when functionally match-all at segment level - if (!rangesBuiltAtShardLevel && !segmentMatchAll(context, leafCtx)) { - return false; - } - - Ranges ranges = this.ranges; - if (ranges == null) { // not built at shard level but segment match all - logger.debug("Shard {} segment {} functionally match all documents. Build the fast filter", shardId, leafCtx.ord); - ranges = buildRanges(leafCtx); - if (ranges == null) { - return false; - } - } + Ranges ranges = tryGetRangesFromSegment(leafCtx); + if (ranges == null) return false; aggregatorBridge.tryFastFilterAggregation(values, incrementDocCount, ranges); @@ -148,9 +131,21 @@ public boolean tryFastFilterAggregation( return true; } - public static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { - Weight weight = ctx.query().rewrite(ctx.searcher()).createWeight(ctx.searcher(), ScoreMode.COMPLETE_NO_SCORES, 1f); - return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs(); + /** + * Even when ranges cannot be built at shard level, we can still build ranges + * at segment level when it's functionally match-all at segment level + */ + private Ranges tryGetRangesFromSegment(LeafReaderContext leafCtx) throws IOException { + if (!rangesBuiltAtShardLevel && !aggregatorBridge.segmentMatchAll(leafCtx)) { + return null; + } + + Ranges ranges = this.ranges; + if (ranges == null) { // not built at shard level but segment match all + logger.debug("Shard {} segment {} functionally match all documents. Build the fast filter", shardId, leafCtx.ord); + ranges = buildRanges(leafCtx); + } + return ranges; } /**