Skip to content

Commit

Permalink
Adding filter based optimization logic to date histogram aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Jain <[email protected]>
  • Loading branch information
jainankitk committed Nov 2, 2023
1 parent 8673fa9 commit f9b0d5f
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 18 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ long roundFloor(long utcMillis) {
}

@Override
long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -106,7 +106,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -117,7 +117,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundQuarterOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -128,7 +128,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundMonthOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -137,7 +137,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, this.ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -146,7 +146,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -161,7 +161,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -176,7 +176,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
};
Expand Down Expand Up @@ -213,7 +213,7 @@ long extraLocalOffsetLookup() {
* look up so that we can see transitions that we might have rounded
* down beyond.
*/
abstract long extraLocalOffsetLookup();
public abstract long extraLocalOffsetLookup();

public byte getId() {
return id;
Expand Down Expand Up @@ -462,11 +462,11 @@ protected Prepared maybeUseArray(long minUtcMillis, long maxUtcMillis, int max)
*
* @opensearch.internal
*/
static class TimeUnitRounding extends Rounding {
public static class TimeUnitRounding extends Rounding {
static final byte ID = 1;

private final DateTimeUnit unit;
private final ZoneId timeZone;
public final DateTimeUnit unit;
public final ZoneId timeZone;
private final boolean unitRoundsToMidnight;

TimeUnitRounding(DateTimeUnit unit, ZoneId timeZone) {
Expand Down Expand Up @@ -920,11 +920,11 @@ public final long nextRoundingValue(long utcMillis) {
*
* @opensearch.internal
*/
static class TimeIntervalRounding extends Rounding {
public static class TimeIntervalRounding extends Rounding {
static final byte ID = 2;

private final long interval;
private final ZoneId timeZone;
public final long interval;
public final ZoneId timeZone;

TimeIntervalRounding(long interval, ZoneId timeZone) {
if (interval < 1) throw new IllegalArgumentException("Zero or negative time interval not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
return in.bulkScorer(context);
}

@Override
public int count(LeafReaderContext context) throws IOException {
shardKeyMap.add(context.reader());
return in.count(context);
}

@Override
public boolean isCacheable(LeafReaderContext ctx) {
return in.isCacheable(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
Expand All @@ -53,7 +58,10 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.time.ZoneId;
import java.time.format.TextStyle;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -81,6 +89,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final LongBounds extendedBounds;
private final LongBounds hardBounds;

private Weight[] filters = null;

private final LongKeyedBucketOrds bucketOrds;

DateHistogramAggregator(
Expand All @@ -99,7 +109,6 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {

super(name, factories, aggregationContext, parent, CardinalityUpperBound.MANY, metadata);
this.rounding = rounding;
this.preparedRounding = preparedRounding;
Expand All @@ -114,6 +123,25 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 0) {
// TODO: Is there better way of getting aggregation field? Can this cause NPE?
final String fieldName = (((ValuesSource.Numeric.FieldData) valuesSource).indexFieldData).getFieldName();
if (context.query() instanceof IndexOrDocValuesQuery) {
final IndexOrDocValuesQuery q = (IndexOrDocValuesQuery) context.query();
if (q.getIndexQuery() instanceof PointRangeQuery) {
final PointRangeQuery prq = (PointRangeQuery) q.getIndexQuery();
// Ensure that the query and aggregation are on the same field
if (valuesSource != null && prq.getField().equals(fieldName)) {
long low = NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0);
long high = NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0);
createFilterForAggregations(fieldName, low, high);
}
}
}
}
}

@Override
Expand All @@ -129,10 +157,22 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

// Need to be declared as final and array for usage within the
// LeafCollector subclass later
final boolean[] useOpt = new boolean[1];
useOpt[0] = true;

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0] && filters != null) {
useOpt[0] = tryFastFilterAggregation(ctx, owningBucketOrd);
}

if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();

Expand Down Expand Up @@ -226,4 +266,86 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
return 1.0;
}
}

private boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) throws IOException {
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
counts[i] = filters[i].count(ctx);
if (counts[i] == -1) {
// Cannot use the optimization if any of the counts
// is -1 indicating the segment might have deleted documents
return false;
}
}

for (i = 0; i < filters.length; i++) {
long bucketOrd = bucketOrds.add(
owningBucketOrd,
preparedRounding.round(NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0))
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}
incrementBucketDocCount(bucketOrd, counts[i]);
}
throw new CollectionTerminatedException();
}

private boolean isUTCTimeZone(final ZoneId zoneId) {
return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}

private void createFilterForAggregations(String field, long low, long high) throws IOException {
long interval = Long.MAX_VALUE;
if (rounding instanceof Rounding.TimeUnitRounding) {
interval = (((Rounding.TimeUnitRounding) rounding).unit).extraLocalOffsetLookup();
if (!isUTCTimeZone(((Rounding.TimeUnitRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return;
}
} else if (rounding instanceof Rounding.TimeIntervalRounding) {
interval = ((Rounding.TimeIntervalRounding) rounding).interval;
if (!isUTCTimeZone(((Rounding.TimeIntervalRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return;
}
}

// Return if the interval ratio could not be figured out correctly
if (interval == Long.MAX_VALUE) return;

// Calculate the number of buckets using range and interval
long roundedLow = preparedRounding.round(low);
int bucketCount = 0;
while (roundedLow < high) {
bucketCount++;
roundedLow += interval;
// Below rounding is needed as the interval could return in
// non-rounded values for something like calendar month
roundedLow = preparedRounding.round(roundedLow);
}

if (bucketCount > 0) {
int i = 0;
filters = new Weight[bucketCount];
while (i < bucketCount) {
byte[] lower = new byte[8];
NumericUtils.longToSortableBytes(low, lower, 0);
byte[] upper = new byte[8];
// Calculate the upper bucket
roundedLow = preparedRounding.round(low);
roundedLow = preparedRounding.round(roundedLow + interval);
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
NumericUtils.longToSortableBytes(Math.min(roundedLow - 1, high), upper, 0);
filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
@Override
protected String toString(int dimension, byte[] value) {
return null;
}
}, ScoreMode.COMPLETE_NO_SCORES, 1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public boolean advanceExact(int target) throws IOException {
*/
public static class FieldData extends Numeric {

protected final IndexNumericFieldData indexFieldData;
public final IndexNumericFieldData indexFieldData;

public FieldData(IndexNumericFieldData indexFieldData) {
this.indexFieldData = indexFieldData;
Expand Down

0 comments on commit f9b0d5f

Please sign in to comment.