Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datehistogram improvement #170

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
"Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
);
}

long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -403,6 +404,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
bucketOrdsToCollect[b++] = ordsEnum.ord();
}
}

InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
// incrementBucketDocCount()
boolean matched = false;
for (int i = 0; i < bits.length; i++) {
if (bits[i].get(doc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.*;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
Expand All @@ -59,7 +60,11 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.time.ZoneId;
import java.time.format.TextStyle;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -169,14 +174,14 @@ public final DeferringBucketCollector getDeferringCollector() {
return deferringCollector;
}

protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException;
protected abstract LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;

@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return getLeafCollector(valuesSource.longValues(ctx), sub);
return getLeafCollector2(ctx, sub);
}

protected final InternalAggregation[] buildAggregations(
Expand Down Expand Up @@ -263,6 +268,9 @@ private static class FromSingle extends AutoDateHistogramAggregator {
private long min = Long.MAX_VALUE;
private long max = Long.MIN_VALUE;

private Weight[] filters = null;
private final ValuesSource.Numeric valuesSource;

FromSingle(
String name,
AggregatorFactories factories,
Expand All @@ -288,13 +296,132 @@ private static class FromSingle extends AutoDateHistogramAggregator {

preparedRounding = prepareRounding(0);
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());

this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 1) {
final String fieldName = valuesSourceConfig.fieldContext().field();
if (context.query() instanceof IndexOrDocValuesQuery) {
final IndexOrDocValuesQuery q = (IndexOrDocValuesQuery) context.query();
if (q.getIndexQuery() instanceof PointRangeQuery) {
final PointRangeQuery prq = (PointRangeQuery) q.getIndexQuery();
// Ensure that the query and aggregation are on the same field
if (prq.getField().equals(fieldName)) {
createFilterForAggregations(fieldName,
NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0),
NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0));
}
}
} else if (context.query() instanceof ConstantScoreQuery) {
final ConstantScoreQuery csq = (ConstantScoreQuery) context.query();
// Ensure that the constant score query is instance of match all query
if (csq.getQuery() instanceof MatchAllDocsQuery) {
findBoundsAndCreateFilters(fieldName, context);
}
} else if (context.query() instanceof MatchAllDocsQuery) {
findBoundsAndCreateFilters(fieldName, context);
}
}
}

private void findBoundsAndCreateFilters(final String fieldName, final SearchContext context) throws IOException {
final List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
// Since the query does not specify bounds for aggregation, we can
// build the global min/max from local min/max within each segment
for (LeafReaderContext leaf : leaves) {
min = Math.min(min, NumericUtils.sortableBytesToLong(
leaf.reader().getPointValues(fieldName).getMinPackedValue(), 0));
max = Math.max(max, NumericUtils.sortableBytesToLong(
leaf.reader().getPointValues(fieldName).getMaxPackedValue(), 0));
}
createFilterForAggregations(fieldName, min, max);
}

private void createFilterForAggregations(final String field, final long low, final long high) throws IOException {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
long interval = 0;
roundingInfosLoop: do {
RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
for (int curInnerInterval: curRoundingInfo.innerIntervals) {
if (bestDuration <= curInnerInterval * curRoundingInfo.roughEstimateDurationMillis) {
interval = curInnerInterval * curRoundingInfo.roughEstimateDurationMillis;
break roundingInfosLoop;
}
}
roundingIdx++;
Comment on lines +349 to +357
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to move the preparedRounding as well to the next index? Else it is always staying at 0 (second level)

} while (
roundingIdx < roundingInfos.length
);

// interval > 0
int i = 0;
filters = new Weight[targetBuckets];
long roundedLow = preparedRounding.round(low);
while (i < targetBuckets) {
// Calculate the lower bucket bound
final byte[] lower = new byte[8];
NumericUtils.longToSortableBytes(Math.max(roundedLow, low), lower, 0);
// Calculate the upper bucket bound
final byte[] upper = new byte[8];
roundedLow = preparedRounding.round(roundedLow + interval);
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
NumericUtils.longToSortableBytes(Math.min(roundedLow - 1, high), upper, 0);

filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
Comment on lines +366 to +377
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has problem since we are focused on creating the required number of target buckets, instead we should limit to the upper and lower bounds. For example, I see the following response:

% curl -s -X GET "localhost:9200/nyc_taxis/_search?pretty" -H 'Content-Type: application/json' -d'{"size": 0,"query": {"range": {"dropoff_datetime": {"gte": "2015-01-01 01:04:06","lt": "2016-01-01 00:00:00"}}},"aggs": {"dropoffs_over_time": {"auto_date_histogram": {"field": "dropoff_datetime","buckets": "4"}}}}'
{
  "took" : 4295,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 100,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "dropoffs_over_time" : {
      "buckets" : [
        {
          "key_as_string" : "2015-01-01 00:00:00",
          "key" : 1420070400000,
          "doc_count" : 100
        },
        {
          "key_as_string" : "2016-01-01 00:00:00",
          "key" : 1451606400000,
          "doc_count" : 0
        },
        {
          "key_as_string" : "2017-01-01 00:00:00",
          "key" : 1483228800000,
          "doc_count" : 0
        }
      ],
      "interval" : "1y"
    }
  }
}

@Override
protected String toString(int dimension, byte[] value) {
return null;
}
}, ScoreMode.COMPLETE_NO_SCORES, 1);
}
}

boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) throws IOException {
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
counts[i] = filters[i].count(ctx);
if (counts[i] == -1) {
// Cannot use the optimization if any of the counts
// is -1 indicating the segment might have deleted documents
return false;
}
}

for (i = 0; i < filters.length; i++) {
long bucketOrd = bucketOrds.add(
owningBucketOrd,
preparedRounding.round(NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0))
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}
incrementBucketDocCount(bucketOrd, counts[i]);
}
throw new CollectionTerminatedException();
Comment on lines +398 to +408
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be aggressive during bucket creation, and invoke the logic to merge buckets if it exceeds the target bucket count:

do {
                        try (LongKeyedBucketOrds oldOrds = bucketOrds) {
                            preparedRounding = prepareRounding(++roundingIdx);
                            long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
                            bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
                            LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0);
                            while (ordsEnum.next()) {
                                long oldKey = ordsEnum.value();
                                long newKey = preparedRounding.round(oldKey);
                                long newBucketOrd = bucketOrds.add(0, newKey);
                                mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd;
                            }
                            merge(mergeMap, bucketOrds.size());
                        }
                    } while (roundingIdx < roundingInfos.length - 1
                        && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
                            || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()));

}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
protected LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = valuesSource.longValues(ctx);

final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (useOpt[0]) {
useOpt[0] = tryFastFilterAggregation(ctx, owningBucketOrd);
}

assert owningBucketOrd == 0;
if (false == values.advanceExact(doc)) {
return;
Expand Down Expand Up @@ -471,6 +598,8 @@ private static class FromMany extends AutoDateHistogramAggregator {
*/
private int rebucketCount = 0;

private final ValuesSource.Numeric valuesSource;

FromMany(
String name,
AggregatorFactories factories,
Expand Down Expand Up @@ -505,10 +634,12 @@ private static class FromMany extends AutoDateHistogramAggregator {
preparedRoundings[0] = roundingPreparer.apply(roundingInfos[0].rounding);
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays());
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
protected LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -115,7 +116,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
) throws IOException {
super(name, factories, aggregationContext, parent, CardinalityUpperBound.MANY, metadata);
this.rounding = rounding;
this.preparedRounding = preparedRounding;
this.preparedRounding = preparedRounding; // TODO reading TimeIntervalRounding$FixedRounding
this.order = order;
order.validate(this);
this.keyed = keyed;
Expand Down Expand Up @@ -188,7 +189,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
int valuesCount = values.docValueCount();

long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
for (int i = 0; i < valuesCount; ++i) { // TODO ? Why support multiple values here?
long value = values.nextValue();
long rounded = preparedRounding.round(value);
assert rounded >= previousRounded;
Expand All @@ -213,29 +214,33 @@ public void collect(int doc, long owningBucketOrd) throws IOException {

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
return buildAggregationsForVariableBuckets(
owningBucketOrds,
bucketOrds,
(bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
},
(owningBucketOrd, buckets) -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(
name,
buckets,
order,
minDocCount,
rounding.offset(),
emptyBucketInfo,
formatter,
keyed,
metadata()
);
});
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(
name,
buckets,
order,
minDocCount,
rounding.offset(),
emptyBucketInfo,
formatter,
keyed,
metadata()
);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,14 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException {
final ContextIndexSearcher searcher = searchContext.searcher();
final IndexReader reader = searcher.getIndexReader();

QuerySearchResult queryResult = searchContext.queryResult();
queryResult.searchTimedOut(false);

try {
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());

Query query = searchContext.query();
assert query == searcher.rewrite(query); // already rewritten

Expand Down