Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
inline class

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jun 21, 2024
1 parent a158f78 commit 8f10faf
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorBridge;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
Expand Down Expand Up @@ -166,74 +166,67 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

optimizationContext = new OptimizationContext(new CompositeAggAggregatorBridge());
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}
optimizationContext = new OptimizationContext(new DateHistogramAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

/**
* Currently the filter rewrite is only supported for date histograms
*/
private final class CompositeAggAggregatorBridge extends AbstractDateHistogramAggAggregatorBridge {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
}
return false;
}
return false;
}



@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}
@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}

protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}
protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}

@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
}
return bounds;
}
return bounds;
}

@Override
protected int getSize() {
return size;
}
@Override
protected int getSize() {
return size;
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}
@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorBridge;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;

import java.io.IOException;
Expand Down Expand Up @@ -135,7 +135,6 @@ static AutoDateHistogramAggregator build(
protected Rounding.Prepared preparedRounding;

private final OptimizationContext optimizationContext;
private final ValuesSourceConfig valuesSourceConfig;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -153,67 +152,63 @@ private AutoDateHistogramAggregator(
this.targetBuckets = targetBuckets;
// TODO: Remove null usage here, by using a different aggregator for create
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
this.valuesSourceConfig = valuesSourceConfig;
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

optimizationContext = new OptimizationContext(new AutoHistogramAggAggregatorBridge());
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}

private final class AutoHistogramAggAggregatorBridge extends AbstractDateHistogramAggAggregatorBridge {

@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}
optimizationContext = new OptimizationContext(new DateHistogramAggregatorBridge() {
@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}

@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}
@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}

@Override
protected Rounding getRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
// reset so this function is idempotent
roundingIdx = 0;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
@Override
protected Rounding getRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
// reset so this function is idempotent
roundingIdx = 0;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}
preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

@Override
protected Prepared getRoundingPrepared() {
return preparedRounding;
}
@Override
protected Prepared getRoundingPrepared() {
return preparedRounding;
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> getBucketOrds().add(0, preparedRounding.round((long) key));
}
@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> getBucketOrds().add(0, preparedRounding.round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.AbstractDateHistogramAggAggregatorBridge;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;

import java.io.IOException;
Expand Down Expand Up @@ -85,7 +85,6 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final LongKeyedBucketOrds bucketOrds;

private final OptimizationContext optimizationContext;
private final ValuesSourceConfig valuesSourceConfig;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -114,51 +113,48 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
this.hardBounds = hardBounds;
// TODO: Stop using null here
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
this.valuesSourceConfig = valuesSourceConfig;
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

optimizationContext = new OptimizationContext(new DateHistogramAggAggregatorBridge());
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}

private final class DateHistogramAggAggregatorBridge extends AbstractDateHistogramAggAggregatorBridge {
@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}
optimizationContext = new OptimizationContext(new DateHistogramAggregatorBridge() {
@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
}

@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}
@Override
protected void buildRanges() throws IOException {
buildRanges(context);
}

@Override
protected Rounding getRounding(long low, long high) {
return rounding;
}
@Override
protected Rounding getRounding(long low, long high) {
return rounding;
}

@Override
protected Rounding.Prepared getRoundingPrepared() {
return preparedRounding;
}
@Override
protected Rounding.Prepared getRoundingPrepared() {
return preparedRounding;
}

@Override
protected long[] processHardBounds(long[] bounds) {
return super.processHardBounds(bounds, hardBounds);
}
@Override
protected long[] processHardBounds(long[] bounds) {
return super.processHardBounds(bounds, hardBounds);
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, preparedRounding.round((long) key));
}
@Override
protected Function<Object, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, preparedRounding.round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
}
}

Expand Down
Loading

0 comments on commit 8f10faf

Please sign in to comment.