Skip to content

Commit

Permalink
Projections prototype (apache#17214)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis authored Oct 5, 2024
1 parent 04fe568 commit 0bd13bc
Show file tree
Hide file tree
Showing 120 changed files with 5,378 additions and 627 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ public boolean equals(Object o)
&& stringEncoding == that.stringEncoding;
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated;
if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals(
fieldName,
that.fieldName
)) {
return getCombiningFactory();
}
return null;
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ public void testEqualsOtherMatches()
Assert.assertArrayEquals(target.getCacheKey(), other.getCacheKey());
}

@Test
public void testCanSubstitute()
{
HllSketchBuildAggregatorFactory factory = new HllSketchBuildAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
true,
true
);
HllSketchBuildAggregatorFactory other = new HllSketchBuildAggregatorFactory(
"other name",
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);

HllSketchBuildAggregatorFactory incompatible = new HllSketchBuildAggregatorFactory(
NAME,
"different field",
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);
Assert.assertNotNull(other.substituteCombiningFactory(factory));
Assert.assertNotNull(factory.substituteCombiningFactory(other));
Assert.assertNull(factory.substituteCombiningFactory(incompatible));
}

@Test
public void testToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void setUp()
frameReader = FrameReader.create(adapter.getRowSignature());
frameList = FrameSequenceBuilder.fromCursorFactory(adapter)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(IntMath.divide(index.size(), MAX_FRAMES, RoundingMode.CEILING))
.maxRowsPerFrame(IntMath.divide(index.numRows(), MAX_FRAMES, RoundingMode.CEILING))
.frames()
.toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception
Assert.assertEquals(18, count);

// Check the index
Assert.assertEquals(9, index.size());
Assert.assertEquals(9, index.numRows());
final IncrementalIndexSegment queryable = new IncrementalIndexSegment(index, SegmentId.dummy("test"));
final List<String> dimensions = index.getDimensionNames(false);
Assert.assertEquals(2, dimensions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,7 @@ public Metadata getMetadata()
null,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* API type to specify an aggregating projection on {@link org.apache.druid.segment.incremental.IncrementalIndexSchema}
*
* Decorated with {@link JsonTypeInfo} annotations as a future-proofing mechanism in the event we add other types of
* projections and need to extract out a base interface from this class.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonTypeName(AggregateProjectionSpec.TYPE_NAME)
public class AggregateProjectionSpec
{
public static final String TYPE_NAME = "aggregate";

private final String name;
private final List<DimensionSchema> groupingColumns;
private final VirtualColumns virtualColumns;
private final AggregatorFactory[] aggregators;
private final List<OrderBy> ordering;
@Nullable
private final String timeColumnName;

@JsonCreator
public AggregateProjectionSpec(
@JsonProperty("name") String name,
@JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
@JsonProperty("groupingColumns") @Nullable List<DimensionSchema> groupingColumns,
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators
)
{
this.name = name;
if (CollectionUtils.isNullOrEmpty(groupingColumns)) {
throw InvalidInput.exception("groupingColumns must not be null or empty");
}
this.groupingColumns = groupingColumns;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
// in the future this should be expanded to support user specified ordering, but for now we compute it based on
// the grouping columns, which is consistent with how rollup ordering works for incremental index base table
final ProjectionOrdering ordering = computeOrdering(this.virtualColumns, this.groupingColumns);
this.ordering = ordering.ordering;
this.timeColumnName = ordering.timeColumnName;
this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public List<DimensionSchema> getGroupingColumns()
{
return groupingColumns;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public AggregatorFactory[] getAggregators()
{
return aggregators;
}

@JsonProperty
public List<OrderBy> getOrdering()
{
return ordering;
}

@JsonIgnore
public AggregateProjectionMetadata.Schema toMetadataSchema()
{
return new AggregateProjectionMetadata.Schema(
name,
timeColumnName,
virtualColumns,
groupingColumns.stream().map(DimensionSchema::getName).collect(Collectors.toList()),
aggregators,
ordering
);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AggregateProjectionSpec that = (AggregateProjectionSpec) o;
return Objects.equals(name, that.name)
&& Objects.equals(groupingColumns, that.groupingColumns)
&& Objects.equals(virtualColumns, that.virtualColumns)
&& Objects.deepEquals(aggregators, that.aggregators)
&& Objects.equals(ordering, that.ordering);
}

@Override
public int hashCode()
{
return Objects.hash(name, groupingColumns, virtualColumns, Arrays.hashCode(aggregators), ordering);
}

@Override
public String toString()
{
return "AggregateProjectionSpec{" +
"name='" + name + '\'' +
", groupingColumns=" + groupingColumns +
", virtualColumns=" + virtualColumns +
", aggregators=" + Arrays.toString(aggregators) +
", ordering=" + ordering +
'}';
}


private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns, List<DimensionSchema> groupingColumns)
{
final List<OrderBy> ordering = Lists.newArrayListWithCapacity(groupingColumns.size());

String timeColumnName = null;
Granularity granularity = null;
// try to find the __time column equivalent, which might be a time_floor expression to model granularity
// bucketing. The time column is decided as the finest granularity on __time detected. If the projection does
// not have a time-like column, the granularity will be handled as ALL for the projection and all projection
// rows will use a synthetic timestamp of the minimum timestamp of the incremental index
for (final DimensionSchema dimension : groupingColumns) {
ordering.add(OrderBy.ascending(dimension.getName()));
if (ColumnHolder.TIME_COLUMN_NAME.equals(dimension.getName())) {
timeColumnName = dimension.getName();
granularity = Granularities.NONE;
} else {
final VirtualColumn vc = virtualColumns.getVirtualColumn(dimension.getName());
final Granularity maybeGranularity = Granularities.fromVirtualColumn(vc);
if (granularity == null && maybeGranularity != null) {
granularity = maybeGranularity;
timeColumnName = dimension.getName();
} else if (granularity != null && maybeGranularity != null && maybeGranularity.isFinerThan(granularity)) {
granularity = maybeGranularity;
timeColumnName = dimension.getName();
}
}
}
return new ProjectionOrdering(ordering, timeColumnName);
}

private static final class ProjectionOrdering
{
private final List<OrderBy> ordering;
@Nullable
private final String timeColumnName;

private ProjectionOrdering(List<OrderBy> ordering, @Nullable String timeColumnName)
{
this.ordering = ordering;
this.timeColumnName = timeColumnName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Query;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
Expand Down Expand Up @@ -160,4 +162,32 @@ public static ExpressionVirtualColumn toVirtualColumn(Granularity granularity, S
ExprMacroTable.granularity()
);
}

/**
* Converts a virtual column with a single input time column into a {@link Granularity} if it is a
* {@link TimestampFloorExprMacro.TimestampFloorExpr}.
* <p>
* IMPORTANT - this method DOES NOT VERIFY that the virtual column has a single input that is a time column
* ({@link ColumnHolder#TIME_COLUMN_NAME} or equivalent projection time column as defined by
* {@link AggregateProjectionMetadata.Schema#getTimeColumnName()}). Callers must verify this externally before
* calling this method by examining {@link VirtualColumn#requiredColumns()}.
* <p>
* This method also does not handle other time expressions, or if the virtual column is just an identifier for a
* time column
*/
@Nullable
public static Granularity fromVirtualColumn(VirtualColumn virtualColumn)
{
if (virtualColumn instanceof ExpressionVirtualColumn) {
final ExpressionVirtualColumn expressionVirtualColumn = (ExpressionVirtualColumn) virtualColumn;
final Expr expr = expressionVirtualColumn.getParsedExpression().get();
if (expr instanceof TimestampFloorExprMacro.TimestampFloorExpr) {
final TimestampFloorExprMacro.TimestampFloorExpr gran = (TimestampFloorExprMacro.TimestampFloorExpr) expr;
if (gran.getArg().getBindingIfIdentifier() != null) {
return gran.getGranularity();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public void segment(String segmentIdentifier)
setDimension("segment", segmentIdentifier);
}

@Override
public void projection(String projection)
{
setDimension("projection", projection);
}

@Override
public void identity(String identity)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";

// projection context keys
public static final String NO_PROJECTIONS = "noProjections";
public static final String FORCE_PROJECTION = "forceProjections";
public static final String USE_PROJECTION = "useProjection";

// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ public interface QueryMetrics<QueryType extends Query<?>>

void segment(String segmentIdentifier);

/**
* If a projection was used during segment processing, set its name as the projection dimension
*/
void projection(String projection);

/**
* @deprecated use {@link #filterBundle(FilterBundle.BundleInfo)} instead to collect details about filters which were
* used to construct {@link org.apache.druid.segment.BitmapOffset} or
Expand Down
Loading

0 comments on commit 0bd13bc

Please sign in to comment.