Skip to content

Commit

Permalink
Refactor the ranges representation
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed May 21, 2024
1 parent 4fbcedd commit 53cb70f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 47 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -37,12 +38,12 @@
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.bucket.range.RangeAggregator.Range;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -188,7 +189,7 @@ public static class FastFilterContext {
private final SearchContext context;

private String fieldName;
private long[][] ranges;
private Ranges ranges;

// debug info related fields
public int leaf;
Expand Down Expand Up @@ -230,8 +231,8 @@ public void buildRanges() throws IOException {
}
}

public long[][] buildRanges(LeafReaderContext leaf) throws IOException {
long[][] ranges = this.aggregationType.buildRanges(leaf, context);
public Ranges buildRanges(LeafReaderContext leaf) throws IOException {
Ranges ranges = this.aggregationType.buildRanges(leaf, context);
if (ranges != null) {
logger.debug("Ranges built for shard {} segment {}", context.indexShard().shardId(), leaf.ord);
}
Expand All @@ -250,11 +251,13 @@ private void consumeDebugInfo(DebugInfo debug) {
interface AggregationType {
boolean isRewriteable(Object parent, int subAggLength);

long[][] buildRanges(SearchContext ctx) throws IOException;
Ranges buildRanges(SearchContext ctx) throws IOException;

long[][] buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException;
Ranges buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException;
}



/**
* For date histogram aggregation
*/
Expand Down Expand Up @@ -286,13 +289,13 @@ public boolean isRewriteable(Object parent, int subAggLength) {
}

@Override
public long[][] buildRanges(SearchContext context) throws IOException {
public Ranges buildRanges(SearchContext context) throws IOException {
long[] bounds = getDateHistoAggBounds(context, fieldType.name());
logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId());
return buildRanges(context, bounds);
}

private long[][] buildRanges(SearchContext context, long[] bounds) throws IOException {
private Ranges buildRanges(SearchContext context, long[] bounds) throws IOException {
bounds = processHardBounds(bounds);
if (bounds == null) {
return null;
Expand Down Expand Up @@ -320,7 +323,7 @@ private long[][] buildRanges(SearchContext context, long[] bounds) throws IOExce
}

@Override
public long[][] buildRanges(LeafReaderContext leaf, SearchContext context) throws IOException {
public Ranges buildRanges(LeafReaderContext leaf, SearchContext context) throws IOException {
long[] bounds = getSegmentBounds(leaf, fieldType.name());
logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord);
return buildRanges(context, bounds);
Expand Down Expand Up @@ -356,6 +359,47 @@ public DateFieldMapper.DateFieldType getFieldType() {
}
}

public static class RangeAggregationType implements FastFilterRewriteHelper.AggregationType {

private final ValuesSource.Numeric source;
private final Range[] ranges;

public RangeAggregationType(ValuesSource.Numeric source, Range[] ranges) {
this.source = source;
this.ranges = ranges;
}

@Override
public boolean isRewriteable(Object parent, int subAggLength) {
if (parent == null && subAggLength == 0) {
// don't accept values source with scripts
if (source instanceof ValuesSource.Numeric.FieldData) {
// ranges are already sorted by from and then to
// we want ranges not overlapping with each other
double prevTo = ranges[0].getTo();
for (int i = 1; i < ranges.length; i++) {
if (prevTo > ranges[i].getFrom()) {
return false;
}
prevTo = ranges[i].getTo();
}
return true;
}
}
return false;
}

@Override
public Ranges buildRanges(SearchContext ctx) throws IOException {
return new Ranges(new byte[][]{}, new byte[][]{});
}

@Override
public Ranges buildRanges(LeafReaderContext leaf, SearchContext ctx) throws IOException {
return new Ranges(new byte[][]{}, new byte[][]{});
}
}

public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
}
Expand Down Expand Up @@ -407,7 +451,7 @@ public static boolean tryFastFilterAggregation(
if (!fastFilterContext.rangesBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) {
return false;
}
long[][] ranges = fastFilterContext.ranges;
Ranges ranges = fastFilterContext.ranges;
if (ranges == null) {
logger.debug(
"Shard {} segment {} functionally match all documents. Build the fast filter",
Expand Down Expand Up @@ -445,7 +489,7 @@ private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leaf
* Creates the date ranges from date histo aggregations using its interval,
* and min/max boundaries
*/
private static long[][] createRangesFromAgg(
private static Ranges createRangesFromAgg(
final SearchContext context,
final DateFieldMapper.DateFieldType fieldType,
final long interval,
Expand Down Expand Up @@ -491,43 +535,61 @@ private static long[][] createRangesFromAgg(
}
}

return ranges;
byte[][] mins = new byte[ranges.length][];
byte[][] maxs = new byte[ranges.length][];
for (int i = 0; i < ranges.length; i++) {
byte[] min = new byte[8];
byte[] max = new byte[8];
LongPoint.encodeDimension(ranges[i][0], min, 0);
LongPoint.encodeDimension(ranges[i][1], max, 0);
mins[i] = min;
maxs[i] = max;
}

return new Ranges(mins, maxs);
}

/**
* @param maxNumNonZeroRanges the number of non-zero ranges to collect
*/
private static DebugInfo multiRangesTraverse(
final PointValues.PointTree tree,
final long[][] ranges,
final Ranges ranges,
final BiConsumer<Long, Integer> incrementDocCount,
final DateFieldMapper.DateFieldType fieldType,
final int maxNumNonZeroRanges
) throws IOException {
// ranges are connected and in ascending order
Iterator<long[]> rangeIter = Arrays.stream(ranges).iterator();
long[] activeRange = rangeIter.next();
// Iterator<long[]> rangeIter = Arrays.stream(ranges).iterator();
// long[] activeRange = rangeIter.next();
//
// // make sure the first range at least crosses the min value of the tree
// DebugInfo debugInfo = new DebugInfo();
// if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) {
// logger.debug("No ranges match the query, skip the fast filter optimization");
// return debugInfo;
// }
// while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) {
// if (!rangeIter.hasNext()) {
// logger.debug("No ranges match the query, skip the fast filter optimization");
// return debugInfo;
// }
// activeRange = rangeIter.next();
// }

// make sure the first range at least crosses the min value of the tree
DebugInfo debugInfo = new DebugInfo();
if (activeRange[0] > NumericUtils.sortableBytesToLong(tree.getMaxPackedValue(), 0)) {
int activeIndex = ranges.firstRange(tree.getMinPackedValue(), tree.getMaxPackedValue());
if (activeIndex < 0) {
logger.debug("No ranges match the query, skip the fast filter optimization");
return debugInfo;
}
while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) {
if (!rangeIter.hasNext()) {
logger.debug("No ranges match the query, skip the fast filter optimization");
return debugInfo;
}
activeRange = rangeIter.next();
}

RangeCollectorForPointTree collector = new RangeCollectorForPointTree(
incrementDocCount,
fieldType,
rangeIter,
maxNumNonZeroRanges,
activeRange
ranges,
activeIndex
);

final ArrayUtil.ByteArrayComparator comparator = ArrayUtil.getUnsignedComparator(8);
Expand All @@ -542,6 +604,40 @@ private static DebugInfo multiRangesTraverse(
return debugInfo;
}

private static final ArrayUtil.ByteArrayComparator comparator = ArrayUtil.getUnsignedComparator(8);

private static int compareByteValue(byte[] value1, byte[] value2) {
return comparator.compare(value1, 0, value2, 0);
}

public static class Ranges {
byte[][] min;
byte[][] max;
int size;

Ranges(byte[][] min, byte[][] max) {
this.min = min;
this.max = max;
assert min.length == max.length;
this.size = min.length;
}

// method to return the first range
public int firstRange(byte[] globalMin, byte[] globalMax) {
if (compareByteValue(min[0], globalMax) > 0) {
return -1;
}
int i = 0;
while (compareByteValue(max[i], globalMin) < 0) {
i++;
if (i >= size) {
return -1;
}
}
return i;
}
}

private static void intersectWithRanges(
PointValues.IntersectVisitor visitor,
PointValues.PointTree pointTree,
Expand Down Expand Up @@ -651,25 +747,28 @@ private static class RangeCollectorForPointTree {
private final DateFieldMapper.DateFieldType fieldType;
private int counter = 0;

private long[] activeRange;
// private long[] activeRange;
private byte[][] activeRangeAsByteArray;
private final Iterator<long[]> rangeIter;
// private final Iterator<long[]> rangeIter;

private Ranges ranges;
private int activeIndex;

private int visitedRange = 0;
private final int maxNumNonZeroRange;

public RangeCollectorForPointTree(
BiConsumer<Long, Integer> incrementDocCount,
DateFieldMapper.DateFieldType fieldType,
Iterator<long[]> rangeIter,
int maxNumNonZeroRange,
long[] activeRange
Ranges ranges,
int activeIndex
) {
this.incrementDocCount = incrementDocCount;
this.fieldType = fieldType;
this.rangeIter = rangeIter;
this.maxNumNonZeroRange = maxNumNonZeroRange;
this.activeRange = activeRange;
this.ranges = ranges;
this.activeIndex = activeIndex;
this.activeRangeAsByteArray = activeRangeAsByteArray();
}

Expand All @@ -683,9 +782,10 @@ private void countNode(int count) {

private void finalizePreviousRange() {
if (counter > 0) {
logger.debug("finalize previous range: {}", activeRange[0]);
long range = LongPoint.decodeDimension(ranges.min[activeIndex],0);
logger.debug("finalize previous range: {}", range);
logger.debug("counter: {}", counter);
incrementDocCount.accept(fieldType.convertNanosToMillis(activeRange[0]), counter);
incrementDocCount.accept(fieldType.convertNanosToMillis(range), counter);
counter = 0;
}
}
Expand All @@ -697,22 +797,26 @@ private boolean iterateRangeEnd(byte[] value, BiFunction<byte[], byte[], Integer
// the new value may not be contiguous to the previous one
// so try to find the first next range that cross the new value
while (comparator.apply(activeRangeAsByteArray[1], value) < 0) {
if (!rangeIter.hasNext()) {
// if (!rangeIter.hasNext()) {
// return true;
// }
// activeRange = rangeIter.next();
if (++activeIndex >= ranges.size) {
return true;
}
activeRange = rangeIter.next();
activeRangeAsByteArray = activeRangeAsByteArray();
}
visitedRange++;
return visitedRange > maxNumNonZeroRange;
}

private byte[][] activeRangeAsByteArray() {
byte[] lower = new byte[8];
byte[] upper = new byte[8];
NumericUtils.longToSortableBytes(activeRange[0], lower, 0);
NumericUtils.longToSortableBytes(activeRange[1], upper, 0);
return new byte[][] { lower, upper };
// byte[] lower = new byte[8];
// byte[] upper = new byte[8];
// NumericUtils.longToSortableBytes(activeRange[0], lower, 0);
// NumericUtils.longToSortableBytes(activeRange[1], upper, 0);
// return new byte[][] { lower, upper };
return new byte[][] { ranges.min[activeIndex], ranges.max[activeIndex] };
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -180,7 +181,7 @@ public final class CompositeAggregator extends BucketsAggregator {
/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

Expand Down
Loading

0 comments on commit 53cb70f

Please sign in to comment.