-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datehistogram improvement #170
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,9 @@ | |
|
||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.SortedNumericDocValues; | ||
import org.apache.lucene.search.ScoreMode; | ||
import org.apache.lucene.search.*; | ||
import org.apache.lucene.util.CollectionUtil; | ||
import org.apache.lucene.util.NumericUtils; | ||
import org.opensearch.common.Rounding; | ||
import org.opensearch.common.Rounding.Prepared; | ||
import org.opensearch.common.lease.Releasables; | ||
|
@@ -59,7 +60,11 @@ | |
import org.opensearch.search.internal.SearchContext; | ||
|
||
import java.io.IOException; | ||
import java.time.ZoneId; | ||
import java.time.format.TextStyle; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Function; | ||
|
@@ -169,14 +174,14 @@ public final DeferringBucketCollector getDeferringCollector() { | |
return deferringCollector; | ||
} | ||
|
||
protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException; | ||
protected abstract LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException; | ||
|
||
@Override | ||
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { | ||
if (valuesSource == null) { | ||
return LeafBucketCollector.NO_OP_COLLECTOR; | ||
} | ||
return getLeafCollector(valuesSource.longValues(ctx), sub); | ||
return getLeafCollector2(ctx, sub); | ||
} | ||
|
||
protected final InternalAggregation[] buildAggregations( | ||
|
@@ -263,6 +268,9 @@ private static class FromSingle extends AutoDateHistogramAggregator { | |
private long min = Long.MAX_VALUE; | ||
private long max = Long.MIN_VALUE; | ||
|
||
private Weight[] filters = null; | ||
private final ValuesSource.Numeric valuesSource; | ||
|
||
FromSingle( | ||
String name, | ||
AggregatorFactories factories, | ||
|
@@ -288,13 +296,132 @@ private static class FromSingle extends AutoDateHistogramAggregator { | |
|
||
preparedRounding = prepareRounding(0); | ||
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); | ||
|
||
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null; | ||
// Create the filters for fast aggregation only if the query is instance | ||
// of point range query and there aren't any parent/sub aggregations | ||
if (parent() == null && subAggregators.length == 1) { | ||
final String fieldName = valuesSourceConfig.fieldContext().field(); | ||
if (context.query() instanceof IndexOrDocValuesQuery) { | ||
final IndexOrDocValuesQuery q = (IndexOrDocValuesQuery) context.query(); | ||
if (q.getIndexQuery() instanceof PointRangeQuery) { | ||
final PointRangeQuery prq = (PointRangeQuery) q.getIndexQuery(); | ||
// Ensure that the query and aggregation are on the same field | ||
if (prq.getField().equals(fieldName)) { | ||
createFilterForAggregations(fieldName, | ||
NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), | ||
NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0)); | ||
} | ||
} | ||
} else if (context.query() instanceof ConstantScoreQuery) { | ||
final ConstantScoreQuery csq = (ConstantScoreQuery) context.query(); | ||
// Ensure that the constant score query is instance of match all query | ||
if (csq.getQuery() instanceof MatchAllDocsQuery) { | ||
findBoundsAndCreateFilters(fieldName, context); | ||
} | ||
} else if (context.query() instanceof MatchAllDocsQuery) { | ||
findBoundsAndCreateFilters(fieldName, context); | ||
} | ||
} | ||
} | ||
|
||
private void findBoundsAndCreateFilters(final String fieldName, final SearchContext context) throws IOException { | ||
final List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves(); | ||
long min = Long.MAX_VALUE, max = Long.MIN_VALUE; | ||
// Since the query does not specify bounds for aggregation, we can | ||
// build the global min/max from local min/max within each segment | ||
for (LeafReaderContext leaf : leaves) { | ||
min = Math.min(min, NumericUtils.sortableBytesToLong( | ||
leaf.reader().getPointValues(fieldName).getMinPackedValue(), 0)); | ||
max = Math.max(max, NumericUtils.sortableBytesToLong( | ||
leaf.reader().getPointValues(fieldName).getMaxPackedValue(), 0)); | ||
} | ||
createFilterForAggregations(fieldName, min, max); | ||
} | ||
|
||
private void createFilterForAggregations(final String field, final long low, final long high) throws IOException { | ||
// max - min / targetBuckets = bestDuration | ||
// find the right innerInterval this bestDuration belongs to | ||
// since we cannot exceed targetBuckets, bestDuration should go up, | ||
// so the right innerInterval should be an upper bound | ||
long bestDuration = (high - low) / targetBuckets; | ||
long interval = 0; | ||
roundingInfosLoop: do { | ||
RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; | ||
for (int curInnerInterval: curRoundingInfo.innerIntervals) { | ||
if (bestDuration <= curInnerInterval * curRoundingInfo.roughEstimateDurationMillis) { | ||
interval = curInnerInterval * curRoundingInfo.roughEstimateDurationMillis; | ||
break roundingInfosLoop; | ||
} | ||
} | ||
roundingIdx++; | ||
} while ( | ||
roundingIdx < roundingInfos.length | ||
); | ||
|
||
// interval > 0 | ||
int i = 0; | ||
filters = new Weight[targetBuckets]; | ||
long roundedLow = preparedRounding.round(low); | ||
while (i < targetBuckets) { | ||
// Calculate the lower bucket bound | ||
final byte[] lower = new byte[8]; | ||
NumericUtils.longToSortableBytes(Math.max(roundedLow, low), lower, 0); | ||
// Calculate the upper bucket bound | ||
final byte[] upper = new byte[8]; | ||
roundedLow = preparedRounding.round(roundedLow + interval); | ||
// Subtract -1 if the minimum is roundedLow as roundedLow itself | ||
// is included in the next bucket | ||
NumericUtils.longToSortableBytes(Math.min(roundedLow - 1, high), upper, 0); | ||
|
||
filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { | ||
Comment on lines
+366
to
+377
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic has problem since we are focused on creating the required number of target buckets, instead we should limit to the upper and lower bounds. For example, I see the following response:
|
||
@Override | ||
protected String toString(int dimension, byte[] value) { | ||
return null; | ||
} | ||
}, ScoreMode.COMPLETE_NO_SCORES, 1); | ||
} | ||
} | ||
|
||
boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) throws IOException { | ||
final int[] counts = new int[filters.length]; | ||
int i; | ||
for (i = 0; i < filters.length; i++) { | ||
counts[i] = filters[i].count(ctx); | ||
if (counts[i] == -1) { | ||
// Cannot use the optimization if any of the counts | ||
// is -1 indicating the segment might have deleted documents | ||
return false; | ||
} | ||
} | ||
|
||
for (i = 0; i < filters.length; i++) { | ||
long bucketOrd = bucketOrds.add( | ||
owningBucketOrd, | ||
preparedRounding.round(NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)) | ||
); | ||
if (bucketOrd < 0) { // already seen | ||
bucketOrd = -1 - bucketOrd; | ||
} | ||
incrementBucketDocCount(bucketOrd, counts[i]); | ||
} | ||
throw new CollectionTerminatedException(); | ||
Comment on lines
+398
to
+408
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can be aggressive during bucket creation, and invoke the logic to merge buckets if it exceeds the target bucket count:
|
||
} | ||
|
||
@Override | ||
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { | ||
protected LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { | ||
SortedNumericDocValues values = valuesSource.longValues(ctx); | ||
|
||
final boolean[] useOpt = new boolean[1]; | ||
useOpt[0] = filters != null; | ||
|
||
return new LeafBucketCollectorBase(sub, values) { | ||
@Override | ||
public void collect(int doc, long owningBucketOrd) throws IOException { | ||
if (useOpt[0]) { | ||
useOpt[0] = tryFastFilterAggregation(ctx, owningBucketOrd); | ||
} | ||
|
||
assert owningBucketOrd == 0; | ||
if (false == values.advanceExact(doc)) { | ||
return; | ||
|
@@ -471,6 +598,8 @@ private static class FromMany extends AutoDateHistogramAggregator { | |
*/ | ||
private int rebucketCount = 0; | ||
|
||
private final ValuesSource.Numeric valuesSource; | ||
|
||
FromMany( | ||
String name, | ||
AggregatorFactories factories, | ||
|
@@ -505,10 +634,12 @@ private static class FromMany extends AutoDateHistogramAggregator { | |
preparedRoundings[0] = roundingPreparer.apply(roundingInfos[0].rounding); | ||
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays()); | ||
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true); | ||
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null; | ||
} | ||
|
||
@Override | ||
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { | ||
protected LeafBucketCollector getLeafCollector2(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { | ||
SortedNumericDocValues values = valuesSource.longValues(ctx); | ||
return new LeafBucketCollectorBase(sub, values) { | ||
@Override | ||
public void collect(int doc, long owningBucketOrd) throws IOException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need to move the preparedRounding as well to the next index? Else it is always staying at 0 (second level)