Skip to content

Commit

Permalink
add support for aggregate only projections (apache#17484)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis authored Nov 25, 2024
1 parent 20aea29 commit ede9e40
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.Cursors;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,10 +74,10 @@ public AggregateProjectionSpec(
)
{
this.name = name;
if (CollectionUtils.isNullOrEmpty(groupingColumns)) {
throw InvalidInput.exception("groupingColumns must not be null or empty");
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) {
throw InvalidInput.exception("groupingColumns and aggregators must not both be null or empty");
}
this.groupingColumns = groupingColumns;
this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
// in the future this should be expanded to support user specified ordering, but for now we compute it based on
// the grouping columns, which is consistent with how rollup ordering works for incremental index base table
Expand Down Expand Up @@ -169,6 +171,10 @@ public String toString()

private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns, List<DimensionSchema> groupingColumns)
{
if (groupingColumns.isEmpty()) {
// call it time ordered, there is no grouping columns so there is only 1 row for this projection
return new ProjectionOrdering(Cursors.ascendingTimeOrder(), null);
}
final List<OrderBy> ordering = Lists.newArrayListWithCapacity(groupingColumns.size());

String timeColumnName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.google.common.collect.Interners;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.OrderBy;
Expand All @@ -40,6 +40,7 @@

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -167,17 +168,17 @@ public Schema(
@JsonProperty("name") String name,
@JsonProperty("timeColumnName") @Nullable String timeColumnName,
@JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
@JsonProperty("groupingColumns") List<String> groupingColumns,
@JsonProperty("groupingColumns") @Nullable List<String> groupingColumns,
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
@JsonProperty("ordering") List<OrderBy> ordering
)
{
this.name = name;
if (CollectionUtils.isNullOrEmpty(groupingColumns)) {
throw InvalidInput.exception("groupingColumns must not be null or empty");
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) {
throw DruidException.defensive("groupingColumns and aggregators must not both be null or empty");
}
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
this.groupingColumns = groupingColumns;
this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns;
this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators;
this.ordering = ordering;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static <T> QueryableProjection<T> findMatchingProjection(
if (name != null && !name.equals(spec.getSchema().getName())) {
continue;
}
ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker);
final ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker);
if (match != null) {
if (cursorBuildSpec.getQueryMetrics() != null) {
cursorBuildSpec.getQueryMetrics().projection(spec.getSchema().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testInvalidGrouping()
null
)
);
Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage());
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());

t = Assert.assertThrows(
DruidException.class,
Expand All @@ -86,7 +86,7 @@ public void testInvalidGrouping()
null
)
);
Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage());
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand All @@ -33,6 +34,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.SortedSet;

public class AggregateProjectionMetadataTest extends InitializedNullHandlingTest
Expand Down Expand Up @@ -132,6 +134,42 @@ public void testComparator()
Assert.assertEquals(good, metadataBest.last());
}

@Test
public void testInvalidGrouping()
{
Throwable t = Assert.assertThrows(
DruidException.class,
() -> new AggregateProjectionMetadata(
new AggregateProjectionMetadata.Schema(
"other_projection",
null,
null,
null,
null,
null
),
0
)
);
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());

t = Assert.assertThrows(
DruidException.class,
() -> new AggregateProjectionMetadata(
new AggregateProjectionMetadata.Schema(
"other_projection",
null,
null,
Collections.emptyList(),
null,
null
),
0
)
);
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
}

@Test
public void testEqualsAndHashcode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,21 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
),
null
),
// cannot really make an 'all' granularity projection, but can do something like floor time to the segment
// granularity interval resulting in a single row
new AggregateProjectionSpec(
"c_sum",
"c_sum_daily",
VirtualColumns.create(Granularities.toVirtualColumn(Granularities.DAY, "__gran")),
Collections.singletonList(new LongDimensionSchema("__gran")),
new AggregatorFactory[]{
new LongSumAggregatorFactory("_c_sum", "c")
}
),
new AggregateProjectionSpec(
"c_sum",
VirtualColumns.EMPTY,
Collections.emptyList(),
new AggregatorFactory[]{
new LongSumAggregatorFactory("_c_sum", "c")
}
)
);

Expand Down Expand Up @@ -1064,27 +1070,9 @@ public void testProjectionSelectionMissingColumnOnBaseTableToo()
Assert.assertArrayEquals(new Object[]{"b", null, 12L, 13.2}, results.get(1).getArray());
}

private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema)
{
File tmp = FileUtils.createTempDir();
CLOSER.register(tmp::delete);
return IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.withMinTimestamp(TIMESTAMP.getMillis())
.withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS)
.build()
)
.rows(ROWS);
}

@Test
public void testTimeseriesQueryGranularityFitsProjectionGranularity()
{
Assume.assumeFalse(sortByDim);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
Expand Down Expand Up @@ -1118,9 +1106,43 @@ public void testTimeseriesQueryGranularityFitsProjectionGranularity()
}

@Test
public void testTimeseriesQueryGranularityAllFitsProjectionGranularity()
public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithSegmentGranularity()
{
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
.granularity(Granularities.ALL)
.aggregators(new LongSumAggregatorFactory("c_sum", "c"))
.context(ImmutableMap.of(QueryContexts.USE_PROJECTION, "c_sum_daily"))
.build();

final CursorBuildSpec buildSpec = TimeseriesQueryEngine.makeCursorBuildSpec(query, null);
try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) {
final Cursor cursor = cursorHolder.asCursor();
int rowCount = 0;
while (!cursor.isDone()) {
rowCount++;
cursor.advance();
}
Assert.assertEquals(1, rowCount);
}

final Sequence<Result<TimeseriesResultValue>> resultRows = timeseriesEngine.process(
query,
projectionsCursorFactory,
projectionsTimeBoundaryInspector,
null
);

final List<Result<TimeseriesResultValue>> results = resultRows.toList();
Assert.assertEquals(1, results.size());
final RowSignature querySignature = query.getResultRowSignature(RowSignature.Finalization.YES);
Assert.assertArrayEquals(new Object[]{TIMESTAMP, 19L}, getResultArray(results.get(0), querySignature));
}

@Test
public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithNoGrouping()
{
Assume.assumeFalse(sortByDim);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
Expand Down Expand Up @@ -1196,6 +1218,22 @@ public void testTimeseriesQueryGranularityFinerThanProjectionGranularity()
Assert.assertArrayEquals(new Object[]{TIMESTAMP.plusHours(1).plusMinutes(1), 2L}, getResultArray(results.get(7), querySignature));
}

private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema)
{
File tmp = FileUtils.createTempDir();
CLOSER.register(tmp::delete);
return IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.withMinTimestamp(TIMESTAMP.getMillis())
.withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS)
.build()
)
.rows(ROWS);
}

private static Set<Object[]> makeArrayResultSet()
{
Expand Down

0 comments on commit ede9e40

Please sign in to comment.