Skip to content

Commit

Permalink
Fix resultcache multiple postaggregation restore (apache#15402)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk authored Nov 21, 2023
1 parent 1df53d6 commit dff5bcb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.data.input.Row;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand Down Expand Up @@ -651,9 +651,10 @@ public ResultRow apply(Object input)
);

if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
int postPos = 0;
while (postItr.hasNext() && results.hasNext()) {
for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
if (!results.hasNext()) {
throw DruidException.defensive("Ran out of objects while reading postaggs from cache!");
}
resultRow.set(postAggregatorStart + postPos, results.next());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,8 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal
)
)
.setPostAggregatorSpecs(
ImmutableList.of(new ConstantPostAggregator("post", 10))
new ConstantPostAggregator("post", 10),
new ConstantPostAggregator("post2", 20)
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
Expand Down Expand Up @@ -1017,14 +1018,14 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal
Assert.assertEquals(result1, fromCacheResult);

// test timestamps that result in integer size millis
final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10);
final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10, 20);

// Please see the comments on aggregator serde and type handling in CacheStrategy.fetchAggregatorsFromCache()
final ResultRow typeAdjustedResult2;
if (valueType.is(ValueType.FLOAT)) {
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10);
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10, 20);
} else if (valueType.is(ValueType.LONG)) {
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10);
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10, 20);
} else {
typeAdjustedResult2 = result2;
}
Expand Down

0 comments on commit dff5bcb

Please sign in to comment.