Skip to content

Commit

Permalink
Fixing bugs and passing unit tests for date histogram
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Jain <[email protected]>
  • Loading branch information
jainankitk committed Nov 20, 2023
1 parent 0c27add commit c19f785
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ public long parse(String value) {
return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant());
}

public long convertNanosToMillis(long nanoSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch);
return nanoSecondsSinceEpoch;
}

public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch);
return milliSecondsSinceEpoch;
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
DateFormatter defaultFormatter = dateTimeFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -269,6 +271,7 @@ private static class FromSingle extends AutoDateHistogramAggregator {

private Weight[] filters = null;
private final ValuesSource.Numeric valuesSource;
private DateFieldMapper.DateFieldType fieldType;

FromSingle(
String name,
Expand Down Expand Up @@ -300,11 +303,25 @@ private static class FromSingle extends AutoDateHistogramAggregator {
// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 0) {
final String fieldName = valuesSourceConfig.fieldContext().field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
final Rounding rounding = getMinimumRounding(bounds[0], bounds[1]);
filters = FilterRewriteHelper.createFilterForAggregations(context, rounding, preparedRounding, fieldName, bounds[0], bounds[1]);
final FieldContext fieldContext = valuesSourceConfig.fieldContext();
if (fieldContext != null) {
final String fieldName = fieldContext.field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType;
fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType();
// TODO: Use millis bound?
final Rounding rounding = getMinimumRounding(bounds[0], bounds[1]);
filters = FilterRewriteHelper.createFilterForAggregations(
context,
rounding,
preparedRounding,
fieldName,
fieldType,
bounds[0],
bounds[1]
);
}
}
}
}
Expand All @@ -317,7 +334,7 @@ private Rounding getMinimumRounding(final long low, final long high) {
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length-1];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
Expand Down Expand Up @@ -345,7 +362,11 @@ boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) th
for (i = 0; i < filters.length; i++) {
long bucketOrd = bucketOrds.add(
owningBucketOrd,
preparedRounding.round(NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0))
preparedRounding.round(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
)
)
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -52,6 +53,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -84,6 +86,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final LongBounds hardBounds;
private Weight[] filters = null;
private final LongKeyedBucketOrds bucketOrds;
private DateFieldMapper.DateFieldType fieldType;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -119,17 +122,23 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 0) {
final String fieldName = valuesSourceConfig.fieldContext().field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
filters = FilterRewriteHelper.createFilterForAggregations(
context,
rounding,
preparedRounding,
fieldName,
bounds[0],
bounds[1]
);
final FieldContext fieldContext = valuesSourceConfig.fieldContext();
if (fieldContext != null) {
final String fieldName = fieldContext.field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType;
fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType();
filters = FilterRewriteHelper.createFilterForAggregations(
context,
rounding,
preparedRounding,
fieldName,
fieldType,
bounds[0],
bounds[1]
);
}
}
}
}
Expand Down Expand Up @@ -272,7 +281,11 @@ private boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucke
for (i = 0; i < filters.length; i++) {
long bucketOrd = bucketOrds.add(
owningBucketOrd,
preparedRounding.round(NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0))
preparedRounding.round(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
)
)
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
Expand All @@ -19,6 +20,7 @@
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Rounding;
import org.opensearch.common.lucene.search.function.FunctionScoreQuery;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.query.DateRangeIncludingNowQuery;
import org.opensearch.search.internal.SearchContext;

Expand Down Expand Up @@ -71,10 +73,15 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina
// Since the query does not specify bounds for aggregation, we can
// build the global min/max from local min/max within each segment
for (LeafReaderContext leaf : leaves) {
min = Math.min(min, NumericUtils.sortableBytesToLong(leaf.reader().getPointValues(fieldName).getMinPackedValue(), 0));
max = Math.max(max, NumericUtils.sortableBytesToLong(leaf.reader().getPointValues(fieldName).getMaxPackedValue(), 0));
final PointValues values = leaf.reader().getPointValues(fieldName);
if (values != null) {
min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0));
}
}

if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null;

return new long[] { min, max };
}

Expand All @@ -89,8 +96,7 @@ public static long[] getAggregationBounds(final SearchContext context, final Str
// Minimum bound for aggregation is the max between query and global
Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]),
// Maximum bound for aggregation is the min between query and global
Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1])
};
Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) };
}
} else if (cq instanceof MatchAllDocsQuery) {
return indexBounds;
Expand All @@ -116,6 +122,7 @@ public static Weight[] createFilterForAggregations(
final Rounding rounding,
final Rounding.Prepared preparedRounding,
final String field,
final DateFieldMapper.DateFieldType fieldType,
final long low,
final long high
) throws IOException {
Expand All @@ -138,34 +145,37 @@ public static Weight[] createFilterForAggregations(
}

// Calculate the number of buckets using range and interval
long roundedLow = preparedRounding.round(low);
long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
long prevRounded = roundedLow;
int bucketCount = 0;
while (roundedLow < high) {
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
bucketCount++;
// Below rounding is needed as the interval could return in
// non-rounded values for something like calendar month
roundedLow = preparedRounding.round(roundedLow + interval);
if (prevRounded == roundedLow)
break;
if (prevRounded == roundedLow) break;
prevRounded = roundedLow;
}

Weight[] filters = null;
if (bucketCount > 0 && bucketCount <= MAX_NUM_FILTER_BUCKETS) {
int i = 0;
filters = new Weight[bucketCount];
roundedLow = preparedRounding.round(low);
roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
while (i < bucketCount) {
// Calculate the lower bucket bound
final byte[] lower = new byte[8];
NumericUtils.longToSortableBytes(i==0 ? low : roundedLow, lower, 0);
NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0);
// Calculate the upper bucket bound
final byte[] upper = new byte[8];
roundedLow = preparedRounding.round(roundedLow + interval);
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
NumericUtils.longToSortableBytes(i+1==bucketCount ? high : roundedLow - 1, upper, 0);
NumericUtils.longToSortableBytes(
i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1,
upper,
0
);
filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
@Override
protected String toString(int dimension, byte[] value) {
Expand Down

0 comments on commit c19f785

Please sign in to comment.