Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
refactor the canOptimize logic
sort out the basic rule about how to provide data from aggregator, and where to put common logic

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jun 19, 2024
1 parent 3f43898 commit 7d9d57e
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregationFunctionProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.optimization.ranges.Helper;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorDataProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;

Expand Down Expand Up @@ -167,35 +166,36 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

optimizationContext = new OptimizationContext(context);
if (!Helper.isCompositeAggRewriteable(sourceConfigs)) {
return;
}
optimizationContext.setAggregationType(new CompositeAggAggregationFunctionProvider());
if (optimizationContext.isRewriteable(parent, subAggregators.length)) {
optimizationContext = new OptimizationContext(context, new CompositeAggAggregatorDataProvider());
if (optimizationContext.canOptimize(parent, subAggregators.length)) {
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = ((CompositeAggAggregationFunctionProvider) optimizationContext.getAggregationType()).getRoundingPrepared();
preparedRounding = ((CompositeAggAggregatorDataProvider) optimizationContext.getAggregationType()).getRoundingPrepared();
optimizationContext.buildRanges(sourceConfigs[0].fieldType());
}
}

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggAggregationFunctionProvider extends AbstractDateHistogramAggAggregationFunctionProvider {
private final RoundingValuesSource valuesSource;
public class CompositeAggAggregatorDataProvider extends AbstractDateHistogramAggAggregatorDataProvider {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggAggregationFunctionProvider() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
@Override
public boolean canOptimize() {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
if (canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType())) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
return true;
}
return false;
}

public Rounding getRounding(final long low, final long high) {
Expand All @@ -207,14 +207,16 @@ public Rounding.Prepared getRoundingPrepared() {
}

@Override
protected void processAfterKey(long[] bound, long interval) {
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
bounds[0] = afterKey + interval;
}
return bounds;
}

@Override
public int getSize() {
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -53,14 +52,14 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.DeferringBucketCollector;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregationFunctionProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorDataProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -137,6 +136,7 @@ static AutoDateHistogramAggregator build(
protected Rounding.Prepared preparedRounding;

private final OptimizationContext optimizationContext;
private final ValuesSourceConfig valuesSourceConfig;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -154,28 +154,23 @@ private AutoDateHistogramAggregator(
this.targetBuckets = targetBuckets;
// TODO: Remove null usage here, by using a different aggregator for create
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
this.valuesSourceConfig = valuesSourceConfig;
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

optimizationContext = new OptimizationContext(
context,
new AutoHistogramAggAggregationFunctionProvider(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
)
);
if (optimizationContext.isRewriteable(parent, subAggregators.length)) {
optimizationContext = new OptimizationContext(context, new AutoHistogramAggAggregatorDataProvider());
if (optimizationContext.canOptimize(parent, subAggregators.length)) {
optimizationContext.buildRanges(Objects.requireNonNull(valuesSourceConfig.fieldType()));
}
}

private class AutoHistogramAggAggregationFunctionProvider extends AbstractDateHistogramAggAggregationFunctionProvider {
private class AutoHistogramAggAggregatorDataProvider extends AbstractDateHistogramAggAggregatorDataProvider {

public AutoHistogramAggAggregationFunctionProvider(MappedFieldType fieldType, boolean missing, boolean hasScript) {
super(fieldType, missing, hasScript);
@Override
public boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -49,12 +48,12 @@
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregationFunctionProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorDataProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -86,6 +85,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final LongKeyedBucketOrds bucketOrds;

private final OptimizationContext optimizationContext;
private final ValuesSourceConfig valuesSourceConfig;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -114,28 +114,22 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
this.hardBounds = hardBounds;
// TODO: Stop using null here
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
this.valuesSourceConfig = valuesSourceConfig;
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

optimizationContext = new OptimizationContext(
context,
new DateHistogramAggAggregationFunctionProvider(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
hardBounds
)
);
if (optimizationContext.isRewriteable(parent, subAggregators.length)) {
optimizationContext = new OptimizationContext(context, new DateHistogramAggAggregatorDataProvider());
if (optimizationContext.canOptimize(parent, subAggregators.length)) {
optimizationContext.buildRanges(Objects.requireNonNull(valuesSourceConfig.fieldType()));
}
}

private class DateHistogramAggAggregationFunctionProvider extends AbstractDateHistogramAggAggregationFunctionProvider {
private class DateHistogramAggAggregatorDataProvider extends AbstractDateHistogramAggAggregatorDataProvider {

public DateHistogramAggAggregationFunctionProvider(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) {
super(fieldType, missing, hasScript, hardBounds);
@Override
public boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}

@Override
Expand All @@ -147,6 +141,11 @@ protected Rounding getRounding(long low, long high) {
protected Rounding.Prepared getRoundingPrepared() {
return preparedRounding;
}

@Override
protected long[] processHardBounds(long[] bounds) {
return super.processHardBounds(bounds, hardBounds);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -55,11 +56,11 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.NonCollectingAggregator;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.RangeAggregationFunctionProvider;
import org.opensearch.search.optimization.ranges.AbstractRangeAggregatorDataProvider;
import org.opensearch.search.optimization.ranges.OptimizationContext;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -251,6 +252,7 @@ public boolean equals(Object obj) {
final double[] maxTo;

private final OptimizationContext optimizationContext;
private final ValuesSourceConfig valuesSourceConfig;

public RangeAggregator(
String name,
Expand All @@ -269,6 +271,7 @@ public RangeAggregator(
super(name, factories, context, parent, cardinality.multiply(ranges.length), metadata);
assert valuesSource != null;
this.valuesSource = valuesSource;
this.valuesSourceConfig = config;
this.format = format;
this.keyed = keyed;
this.rangeFactory = rangeFactory;
Expand All @@ -280,15 +283,24 @@ public RangeAggregator(
maxTo[i] = Math.max(this.ranges[i].to, maxTo[i - 1]);
}

optimizationContext = new OptimizationContext(
context,
new RangeAggregationFunctionProvider(config, ranges)
);
if (optimizationContext.isRewriteable(parent, subAggregators.length)) {
optimizationContext = new OptimizationContext(context, new RangeAggregatorDataProvider());
if (optimizationContext.canOptimize(parent, subAggregators.length)) {
optimizationContext.buildRanges(Objects.requireNonNull(config.fieldType()));
}
}

class RangeAggregatorDataProvider extends AbstractRangeAggregatorDataProvider {
@Override
public boolean canOptimize() {
return canOptimize(valuesSourceConfig, ranges);
}

@Override
public OptimizationContext.Ranges buildRanges(SearchContext ctx, MappedFieldType fieldType) {
return buildRanges(fieldType, ranges);
}
}

@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand Down
Loading

0 comments on commit 7d9d57e

Please sign in to comment.