Skip to content

Commit

Permalink
Changes to support unmapped fields in metric aggregation (opensearch-…
Browse files Browse the repository at this point in the history
…project#16481)

Avoids exception when querying unmapped field when star tree experimental
feature is enables.

---------

Signed-off-by: expani <[email protected]>
  • Loading branch information
expani authored Jan 7, 2025
1 parent 0b36599 commit e7e19f7
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private static MetricStat validateStarTreeMetricSupport(
MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
field = ((MetricAggregatorFactory) aggregatorFactory).getField();

if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
if (field != null && supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
return metricStat;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ public String getStatsSubtype() {
}

public String getField() {
return config.fieldContext().field();
return config.fieldContext() != null ? config.fieldContext().field() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,27 @@
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.CompositeIndexReader;
import org.opensearch.index.codec.composite.composite912.Composite912Codec;
import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.NumericDimension;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.AggregatorTestCase;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder;
Expand All @@ -49,14 +58,17 @@
import org.opensearch.search.aggregations.metrics.InternalSum;
import org.opensearch.search.aggregations.metrics.InternalValueCount;
import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.metrics.MinAggregationBuilder;
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder;
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
Expand All @@ -69,6 +81,8 @@
import static org.opensearch.search.aggregations.AggregationBuilders.min;
import static org.opensearch.search.aggregations.AggregationBuilders.sum;
import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MetricAggregatorTests extends AggregatorTestCase {

Expand Down Expand Up @@ -267,6 +281,110 @@ public void testStarTreeDocValues() throws IOException {
);
}

CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();

QueryShardContext queryShardContext = queryShardContextMock(
indexSearcher,
mapperServiceMock(),
createIndexSettings(),
circuitBreakerService,
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), circuitBreakerService).withCircuitBreaking()
);

MetricAggregatorFactory aggregatorFactory = mock(MetricAggregatorFactory.class);
when(aggregatorFactory.getSubFactories()).thenReturn(AggregatorFactories.EMPTY);
when(aggregatorFactory.getField()).thenReturn(FIELD_NAME);
when(aggregatorFactory.getMetricStat()).thenReturn(MetricStat.SUM);

// Case when field and metric type in aggregation are fully supported by star tree.
testCase(
indexSearcher,
query,
queryBuilder,
sumAggregationBuilder,
starTree,
supportedDimensions,
List.of(new Metric(FIELD_NAME, List.of(MetricStat.SUM, MetricStat.MAX, MetricStat.MIN, MetricStat.AVG))),
verifyAggregation(InternalSum::getValue),
aggregatorFactory,
true
);

// Case when the field is not supported by star tree
SumAggregationBuilder invalidFieldSumAggBuilder = sum("_name").field("hello");
testCase(
indexSearcher,
query,
queryBuilder,
invalidFieldSumAggBuilder,
starTree,
supportedDimensions,
Collections.emptyList(),
verifyAggregation(InternalSum::getValue),
invalidFieldSumAggBuilder.build(queryShardContext, null),
false // Invalid fields will return null StarTreeQueryContext which will not cause early termination by leaf collector
);

// Case when metric type in aggregation is not supported by star tree but the field is supported.
testCase(
indexSearcher,
query,
queryBuilder,
sumAggregationBuilder,
starTree,
supportedDimensions,
List.of(new Metric(FIELD_NAME, List.of(MetricStat.MAX, MetricStat.MIN, MetricStat.AVG))),
verifyAggregation(InternalSum::getValue),
aggregatorFactory,
false
);

// Case when field is not present in supported metrics
testCase(
indexSearcher,
query,
queryBuilder,
sumAggregationBuilder,
starTree,
supportedDimensions,
List.of(new Metric("hello", List.of(MetricStat.MAX, MetricStat.MIN, MetricStat.AVG))),
verifyAggregation(InternalSum::getValue),
aggregatorFactory,
false
);

AggregatorFactories aggregatorFactories = mock(AggregatorFactories.class);
when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] { mock(MetricAggregatorFactory.class) });
when(aggregatorFactory.getSubFactories()).thenReturn(aggregatorFactories);

// Case when sub aggregations are present
testCase(
indexSearcher,
query,
queryBuilder,
sumAggregationBuilder,
starTree,
supportedDimensions,
List.of(new Metric("hello", List.of(MetricStat.MAX, MetricStat.MIN, MetricStat.AVG))),
verifyAggregation(InternalSum::getValue),
aggregatorFactory,
false
);

// Case when aggregation factory is not metric aggregation
testCase(
indexSearcher,
query,
queryBuilder,
sumAggregationBuilder,
starTree,
supportedDimensions,
List.of(new Metric("hello", List.of(MetricStat.MAX, MetricStat.MIN, MetricStat.AVG))),
verifyAggregation(InternalSum::getValue),
mock(ValuesSourceAggregatorFactory.class),
false
);

ir.close();
directory.close();
}
Expand All @@ -287,6 +405,21 @@ private <T extends AggregationBuilder, V extends InternalAggregation> void testC
CompositeIndexFieldInfo starTree,
List<Dimension> supportedDimensions,
BiConsumer<V, V> verify
) throws IOException {
testCase(searcher, query, queryBuilder, aggBuilder, starTree, supportedDimensions, Collections.emptyList(), verify, null, true);
}

private <T extends AggregationBuilder, V extends InternalAggregation> void testCase(
IndexSearcher searcher,
Query query,
QueryBuilder queryBuilder,
T aggBuilder,
CompositeIndexFieldInfo starTree,
List<Dimension> supportedDimensions,
List<Metric> supportedMetrics,
BiConsumer<V, V> verify,
AggregatorFactory aggregatorFactory,
boolean assertCollectorEarlyTermination
) throws IOException {
V starTreeAggregation = searchAndReduceStarTree(
createIndexSettings(),
Expand All @@ -296,8 +429,11 @@ private <T extends AggregationBuilder, V extends InternalAggregation> void testC
aggBuilder,
starTree,
supportedDimensions,
supportedMetrics,
DEFAULT_MAX_BUCKETS,
false,
aggregatorFactory,
assertCollectorEarlyTermination,
DEFAULT_MAPPED_FIELD
);
V expectedAggregation = searchAndReduceStarTree(
Expand All @@ -308,8 +444,11 @@ private <T extends AggregationBuilder, V extends InternalAggregation> void testC
aggBuilder,
null,
null,
null,
DEFAULT_MAX_BUCKETS,
false,
aggregatorFactory,
assertCollectorEarlyTermination,
DEFAULT_MAPPED_FIELD
);
verify.accept(expectedAggregation, starTreeAggregation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ public void testStarTreeFilterWithDocsInSVDFieldButNoStarNode() throws IOExcepti
testStarTreeFilter(10, false);
}

private void testStarTreeFilter(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) throws IOException {
private Directory createStarTreeIndex(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension, List<Document> docs)
throws IOException {
Directory directory = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(null);
conf.setCodec(getCodec(maxLeafDoc, skipStarNodeCreationForSDVDimension));
conf.setMergePolicy(newLogMergePolicy());
RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf);
int totalDocs = 100;

List<Document> docs = new ArrayList<>();
for (int i = 0; i < totalDocs; i++) {
Document doc = new Document();
doc.add(new SortedNumericDocValuesField(SNDV, i));
Expand All @@ -110,6 +110,15 @@ private void testStarTreeFilter(int maxLeafDoc, boolean skipStarNodeCreationForS
}
iw.forceMerge(1);
iw.close();
return directory;
}

private void testStarTreeFilter(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) throws IOException {
List<Document> docs = new ArrayList<>();

Directory directory = createStarTreeIndex(maxLeafDoc, skipStarNodeCreationForSDVDimension, docs);

int totalDocs = docs.size();

DirectoryReader ir = DirectoryReader.open(directory);
initValuesSourceRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.opensearch.index.cache.query.DisabledQueryCache;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper;
import org.opensearch.index.fielddata.IndexFieldData;
import org.opensearch.index.fielddata.IndexFieldDataCache;
Expand Down Expand Up @@ -348,7 +349,9 @@ protected CountingAggregator createCountingAggregator(
IndexSettings indexSettings,
CompositeIndexFieldInfo starTree,
List<Dimension> supportedDimensions,
List<Metric> supportedMetrics,
MultiBucketConsumer bucketConsumer,
AggregatorFactory aggregatorFactory,
MappedFieldType... fieldTypes
) throws IOException {
SearchContext searchContext;
Expand All @@ -360,7 +363,9 @@ protected CountingAggregator createCountingAggregator(
queryBuilder,
starTree,
supportedDimensions,
supportedMetrics,
bucketConsumer,
aggregatorFactory,
fieldTypes
);
} else {
Expand Down Expand Up @@ -389,7 +394,9 @@ protected SearchContext createSearchContextWithStarTreeContext(
QueryBuilder queryBuilder,
CompositeIndexFieldInfo starTree,
List<Dimension> supportedDimensions,
List<Metric> supportedMetrics,
MultiBucketConsumer bucketConsumer,
AggregatorFactory aggregatorFactory,
MappedFieldType... fieldTypes
) throws IOException {
SearchContext searchContext = createSearchContext(
Expand All @@ -406,14 +413,20 @@ protected SearchContext createSearchContextWithStarTreeContext(
AggregatorFactories aggregatorFactories = mock(AggregatorFactories.class);
when(searchContext.aggregations()).thenReturn(searchContextAggregations);
when(searchContextAggregations.factories()).thenReturn(aggregatorFactories);
when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] {});

if (aggregatorFactory != null) {
when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] { aggregatorFactory });
} else {
when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] {});
}

CompositeDataCubeFieldType compositeMappedFieldType = mock(CompositeDataCubeFieldType.class);
when(compositeMappedFieldType.name()).thenReturn(starTree.getField());
when(compositeMappedFieldType.getCompositeIndexType()).thenReturn(starTree.getType());
Set<CompositeMappedFieldType> compositeFieldTypes = Set.of(compositeMappedFieldType);

when((compositeMappedFieldType).getDimensions()).thenReturn(supportedDimensions);
when((compositeMappedFieldType).getMetrics()).thenReturn(supportedMetrics);
MapperService mapperService = mock(MapperService.class);
when(mapperService.getCompositeFieldTypes()).thenReturn(compositeFieldTypes);
when(searchContext.mapperService()).thenReturn(mapperService);
Expand Down Expand Up @@ -740,8 +753,11 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
CompositeIndexFieldInfo compositeIndexFieldInfo,
List<Dimension> supportedDimensions,
List<Metric> supportedMetrics,
int maxBucket,
boolean hasNested,
AggregatorFactory aggregatorFactory,
boolean assertCollectorEarlyTermination,
MappedFieldType... fieldTypes
) throws IOException {
query = query.rewrite(searcher);
Expand All @@ -764,15 +780,17 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
indexSettings,
compositeIndexFieldInfo,
supportedDimensions,
supportedMetrics,
bucketConsumer,
aggregatorFactory,
fieldTypes
);

countingAggregator.preCollection();
searcher.search(query, countingAggregator);
countingAggregator.postCollection();
aggs.add(countingAggregator.buildTopLevel());
if (compositeIndexFieldInfo != null) {
if (compositeIndexFieldInfo != null && assertCollectorEarlyTermination) {
assertEquals(0, countingAggregator.collectCounter.get());
}

Expand Down

0 comments on commit e7e19f7

Please sign in to comment.