From 6a6f36aa5095d009a7c29fe73ad93d505cc250da Mon Sep 17 00:00:00 2001 From: takaaki7 Date: Wed, 11 Oct 2023 15:42:48 +0900 Subject: [PATCH 1/4] Add table sampling --- .../org/apache/druid/query/DataSource.java | 1 + .../druid/query/SampledTableDataSource.java | 192 ++++++++++++++++++ .../druid/query/topn/TopNQueryEngine.java | 2 +- .../druid/client/CachingClusteredClient.java | 32 +++ 4 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 360c339627f9..8f10abbfe672 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -36,6 +36,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TableDataSource.class) @JsonSubTypes({ @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), + @JsonSubTypes.Type(value = TableDataSource.class, name = "sampled_table"), @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), diff --git a/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java new file mode 100644 index 000000000000..bd6203113421 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import org.apache.druid.java.util.common.Cacheable; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; + +@JsonTypeName("sampled_table") +public class SampledTableDataSource implements DataSource +{ + private final String name; + private final SamplingType samplingType; + private final float samplingPercentage; + public enum SamplingType implements Cacheable + { + FIXED_SHARD; + + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } + + @JsonCreator + public static SamplingType fromString(String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + + @Override + public byte[] getCacheKey() + { + return new byte[] {(byte) this.ordinal()}; + } + } + + @JsonCreator + public SampledTableDataSource( + @JsonProperty("name") String name, + @JsonProperty("samplingType") SamplingType samplingType, + @JsonProperty("samplingPercentage") float samplingPercentage + ) + { + this.name = Preconditions.checkNotNull(name, "'name' must be nonnull"); + this.samplingType = samplingType; + this.samplingPercentage = samplingPercentage; + } + + @JsonCreator +public static SampledTableDataSource create(final String name, final String samplingType, final float samplingRatio) + { + return new SampledTableDataSource(name, SamplingType.fromString(samplingType), samplingRatio); + } + + @JsonProperty + public String getName() + { + return name; + } + + @Override + public Set getTableNames() + { + return Collections.singleton(name); + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable(boolean isBroker) + { + return true; + } + + @Override + public boolean isGlobal() + { + return false; + } + + @Override + public boolean isConcrete() + { + return true; + } + + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTime + ) + { + return Function.identity(); + } + + @Override + public DataSource withUpdatedDataSource(DataSource newSource) + { + return newSource; + } + + @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + + @Override + public DataSourceAnalysis getAnalysis() + { + return new DataSourceAnalysis(this, null, null, Collections.emptyList()); + } + + public SamplingType getSamplingType() { + return samplingType; + } + + public float getSamplingPercentage() { + return samplingPercentage; + } + + @Override + public String toString() + { + return name; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SampledTableDataSource that = (SampledTableDataSource) o; + return name.equals(that.name); + } + + @Override + public int hashCode() + { + return Objects.hash(name); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 50b8a30d1028..084db4b61934 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -132,7 +132,7 @@ private TopNMapFn getMapFn( final TopNAlgorithm topNAlgorithm; - if (canUsePooledAlgorithm(selector, query, columnCapabilities)) { + if (canUsePooledAlgorithm(selector, query, columnCapabilities)) { // pool based algorithm selection, if we can if (selector.isAggregateAllMetrics()) { // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 19df276344ef..30db29162fa5 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -69,6 +69,8 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.Result; +import org.apache.druid.query.SampledTableDataSource; +import org.apache.druid.query.SampledTableDataSource.SamplingType; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; @@ -344,6 +346,7 @@ ClusterQueryResult run( } final Set segmentServers = computeSegmentsToQuery(timeline, specificSegments); + pruneSegmentsForShardSampling(segmentServers); @Nullable final byte[] queryCacheKey = cacheKeyManager.computeSegmentLevelQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { @@ -460,6 +463,7 @@ private Set computeSegmentsToQuery( segments.add(new SegmentServerSelector(server, segment)); } } + return segments; } @@ -503,6 +507,33 @@ private void computeUncoveredIntervals(TimelineLookup ti } } + private void pruneSegmentsForShardSampling( + final Set segments + ) { + if ( + query.getDataSource() instanceof SampledTableDataSource + ) { + if (((SampledTableDataSource) query.getDataSource()).getSamplingType() + == SamplingType.FIXED_SHARD) { + int allShards = segments.stream().mapToInt(s->s.getSegmentDescriptor().getPartitionNumber()).max().getAsInt(); + int targetShards = Math.round( + allShards * ((SampledTableDataSource) query.getDataSource()).getSamplingPercentage()); + Iterator iterator = segments.iterator(); + while (iterator.hasNext()) { + SegmentServerSelector segmentServerSelector = iterator.next(); + SegmentDescriptor segmentDescriptor = segmentServerSelector.getSegmentDescriptor(); + int shard = segmentDescriptor.getPartitionNumber(); + if (targetShards < shard) { + iterator.remove(); + } + } + } else { + throw new UnsupportedOperationException(""); + } + } + } + + private List> pruneSegmentsWithCachedResults( final byte[] queryCacheKey, final Set segments @@ -541,6 +572,7 @@ private Map computePerSegmentCacheKeys( byte[] queryCacheKey ) { + // cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order Map cacheKeys = Maps.newLinkedHashMap(); for (SegmentServerSelector segmentServer : segments) { From c339b859529c3a3bf341c1141f19918c95f98fb0 Mon Sep 17 00:00:00 2001 From: takaaki7 Date: Thu, 12 Oct 2023 17:36:04 +0900 Subject: [PATCH 2/4] add table samplinng --- .../org/apache/druid/query/DataSource.java | 2 +- .../druid/query/SampledTableDataSource.java | 119 ++++-------------- .../druid/query/context/ResponseContext.java | 7 ++ .../query/planning/DataSourceAnalysis.java | 7 +- .../druid/client/CachingClusteredClient.java | 23 ++-- 5 files changed, 55 insertions(+), 103 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 8f10abbfe672..4c4113220ed9 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -36,7 +36,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TableDataSource.class) @JsonSubTypes({ @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), - @JsonSubTypes.Type(value = TableDataSource.class, name = "sampled_table"), + @JsonSubTypes.Type(value = SampledTableDataSource.class, name = "sampled_table"), @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), diff --git a/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java index bd6203113421..17ee748e173f 100644 --- a/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java @@ -37,11 +37,10 @@ import org.apache.druid.segment.SegmentReference; @JsonTypeName("sampled_table") -public class SampledTableDataSource implements DataSource +public class SampledTableDataSource extends TableDataSource { - private final String name; private final SamplingType samplingType; - private final float samplingPercentage; + private final int samplingPercentage; public enum SamplingType implements Cacheable { FIXED_SHARD; @@ -70,123 +69,59 @@ public byte[] getCacheKey() public SampledTableDataSource( @JsonProperty("name") String name, @JsonProperty("samplingType") SamplingType samplingType, - @JsonProperty("samplingPercentage") float samplingPercentage + @JsonProperty("samplingPercentage") int samplingPercentage ) { - this.name = Preconditions.checkNotNull(name, "'name' must be nonnull"); + super(name); this.samplingType = samplingType; this.samplingPercentage = samplingPercentage; } @JsonCreator -public static SampledTableDataSource create(final String name, final String samplingType, final float samplingRatio) + public static SampledTableDataSource create( + @JsonProperty("name")final String name, + @JsonProperty("samplingType")final String samplingType, + @JsonProperty("samplingPercentage")final int samplingPercentage) { - return new SampledTableDataSource(name, SamplingType.fromString(samplingType), samplingRatio); + return new SampledTableDataSource(name, SamplingType.fromString(samplingType), samplingPercentage); } - @JsonProperty - public String getName() - { - return name; - } - - @Override - public Set getTableNames() - { - return Collections.singleton(name); - } - - @Override - public List getChildren() - { - return Collections.emptyList(); - } - - @Override - public DataSource withChildren(List children) - { - if (!children.isEmpty()) { - throw new IAE("Cannot accept children"); - } - - return this; - } - - @Override - public boolean isCacheable(boolean isBroker) - { - return true; - } - - @Override - public boolean isGlobal() - { - return false; - } - - @Override - public boolean isConcrete() - { - return true; - } - - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) - { - return Function.identity(); - } - - @Override - public DataSource withUpdatedDataSource(DataSource newSource) - { - return newSource; - } - - @Override - public byte[] getCacheKey() - { - return new byte[0]; - } - - @Override - public DataSourceAnalysis getAnalysis() - { - return new DataSourceAnalysis(this, null, null, Collections.emptyList()); - } + @JsonProperty public SamplingType getSamplingType() { return samplingType; } + @JsonProperty public float getSamplingPercentage() { return samplingPercentage; } @Override - public String toString() - { - return name; - } - - @Override - public boolean equals(Object o) - { + public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof SampledTableDataSource)) { return false; } + if (!super.equals(o)) { + return false; + } + SampledTableDataSource that = (SampledTableDataSource) o; - return name.equals(that.name); + + if (samplingPercentage != that.samplingPercentage) { + return false; + } + return samplingType == that.samplingType; } @Override - public int hashCode() - { - return Objects.hash(name); + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (samplingType != null ? samplingType.hashCode() : 0); + result = 31 * result + samplingPercentage; + return result; } } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 6727782cc406..dfd7299f56ce 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -453,6 +453,12 @@ public Object mergeValues(Object oldValue, Object newValue) false ); + public static final Key SAMPLING_COMPOSITION = new StringKey( + "samplingComposition", + true, + false + ); + /** * Indicates if a {@link ResponseContext} was truncated during serialization. */ @@ -488,6 +494,7 @@ public Object mergeValues(Object oldValue, Object newValue) TIMEOUT_AT, NUM_SCANNED_ROWS, CPU_CONSUMED_NANOS, + SAMPLING_COMPOSITION, TRUNCATED, } ); diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index f17ab6aec235..7f23eb217cf8 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -24,6 +24,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.SampledTableDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; @@ -113,10 +114,11 @@ public DataSource getBaseDataSource() * Note that this can return empty even if {@link #isConcreteAndTableBased()} is true. This happens if the base * datasource is a {@link UnionDataSource} of {@link TableDataSource}. */ - public Optional getBaseTableDataSource() - { + public Optional getBaseTableDataSource() { if (baseDataSource instanceof TableDataSource) { return Optional.of((TableDataSource) baseDataSource); + } else if (baseDataSource instanceof SampledTableDataSource) { + return Optional.of((SampledTableDataSource) baseDataSource); } else { return Optional.empty(); } @@ -216,6 +218,7 @@ public boolean isConcreteBased() public boolean isTableBased() { return (baseDataSource instanceof TableDataSource + || baseDataSource instanceof SampledTableDataSource || (baseDataSource instanceof UnionDataSource && baseDataSource.getChildren() .stream() diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 30db29162fa5..e4c11020e386 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -19,6 +19,8 @@ package org.apache.druid.client; +import static org.apache.druid.query.context.ResponseContext.Keys.SAMPLING_COMPOSITION; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -346,7 +348,10 @@ ClusterQueryResult run( } final Set segmentServers = computeSegmentsToQuery(timeline, specificSegments); - pruneSegmentsForShardSampling(segmentServers); + Pair ratio = pruneSegmentsForShardSampling(segmentServers); + if (ratio != null) { + responseContext.add(SAMPLING_COMPOSITION, ratio.lhs + "/" + ratio.rhs); + } @Nullable final byte[] queryCacheKey = cacheKeyManager.computeSegmentLevelQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { @@ -507,30 +512,32 @@ private void computeUncoveredIntervals(TimelineLookup ti } } - private void pruneSegmentsForShardSampling( - final Set segments - ) { - if ( - query.getDataSource() instanceof SampledTableDataSource - ) { + private Pair pruneSegmentsForShardSampling(final Set segments) { + if (query.getDataSource() instanceof SampledTableDataSource) { if (((SampledTableDataSource) query.getDataSource()).getSamplingType() == SamplingType.FIXED_SHARD) { - int allShards = segments.stream().mapToInt(s->s.getSegmentDescriptor().getPartitionNumber()).max().getAsInt(); + int allSegmentsSize = segments.size(); + int allShards = segments.stream() + .mapToInt(s -> s.getSegmentDescriptor().getPartitionNumber()).max().getAsInt(); int targetShards = Math.round( allShards * ((SampledTableDataSource) query.getDataSource()).getSamplingPercentage()); Iterator iterator = segments.iterator(); + int removedSegments = 0; while (iterator.hasNext()) { SegmentServerSelector segmentServerSelector = iterator.next(); SegmentDescriptor segmentDescriptor = segmentServerSelector.getSegmentDescriptor(); int shard = segmentDescriptor.getPartitionNumber(); if (targetShards < shard) { + removedSegments++; iterator.remove(); } } + return Pair.of(allSegmentsSize - removedSegments, allSegmentsSize); } else { throw new UnsupportedOperationException(""); } } + return null; } From 1fb88a34611df771ecba8471947894b1e813dd56 Mon Sep 17 00:00:00 2001 From: takaaki7 Date: Thu, 12 Oct 2023 18:48:03 +0900 Subject: [PATCH 3/4] Fix build error --- .../druid/query/SampledTableDataSource.java | 23 ++++++++----------- .../druid/query/context/ResponseContext.java | 6 +++++ .../query/planning/DataSourceAnalysis.java | 3 ++- .../druid/query/topn/TopNQueryEngine.java | 2 +- .../druid/client/CachingClusteredClient.java | 7 +++--- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java index 17ee748e173f..5d1dd777f171 100644 --- a/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/SampledTableDataSource.java @@ -23,24 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.base.Preconditions; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import org.apache.druid.java.util.common.Cacheable; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; @JsonTypeName("sampled_table") public class SampledTableDataSource extends TableDataSource { private final SamplingType samplingType; private final int samplingPercentage; + public enum SamplingType implements Cacheable { FIXED_SHARD; @@ -88,17 +79,20 @@ public static SampledTableDataSource create( @JsonProperty - public SamplingType getSamplingType() { + public SamplingType getSamplingType() + { return samplingType; } @JsonProperty - public float getSamplingPercentage() { + public float getSamplingPercentage() + { return samplingPercentage; } @Override - public boolean equals(Object o) { + public boolean equals(Object o) + { if (this == o) { return true; } @@ -118,7 +112,8 @@ public boolean equals(Object o) { } @Override - public int hashCode() { + public int hashCode() + { int result = super.hashCode(); result = 31 * result + (samplingType != null ? samplingType.hashCode() : 0); result = 31 * result + samplingPercentage; diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index dfd7299f56ce..34cdb580eb70 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -745,6 +745,12 @@ public void addCpuNanos(long ns) addValue(Keys.CPU_CONSUMED_NANOS, ns); } + + public void addSamplingComposition(String samplingComposition) + { + addValue(Keys.SAMPLING_COMPOSITION, samplingComposition); + } + private Object addValue(Key key, Object value) { return getDelegate().merge(key, value, key::mergeValues); diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index 7f23eb217cf8..dfea4cc36d19 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -114,7 +114,8 @@ public DataSource getBaseDataSource() * Note that this can return empty even if {@link #isConcreteAndTableBased()} is true. This happens if the base * datasource is a {@link UnionDataSource} of {@link TableDataSource}. */ - public Optional getBaseTableDataSource() { + public Optional getBaseTableDataSource() + { if (baseDataSource instanceof TableDataSource) { return Optional.of((TableDataSource) baseDataSource); } else if (baseDataSource instanceof SampledTableDataSource) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 084db4b61934..50b8a30d1028 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -132,7 +132,7 @@ private TopNMapFn getMapFn( final TopNAlgorithm topNAlgorithm; - if (canUsePooledAlgorithm(selector, query, columnCapabilities)) { + if (canUsePooledAlgorithm(selector, query, columnCapabilities)) { // pool based algorithm selection, if we can if (selector.isAggregateAllMetrics()) { // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index e4c11020e386..590a29e4a8d4 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -19,8 +19,6 @@ package org.apache.druid.client; -import static org.apache.druid.query.context.ResponseContext.Keys.SAMPLING_COMPOSITION; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -350,7 +348,7 @@ ClusterQueryResult run( final Set segmentServers = computeSegmentsToQuery(timeline, specificSegments); Pair ratio = pruneSegmentsForShardSampling(segmentServers); if (ratio != null) { - responseContext.add(SAMPLING_COMPOSITION, ratio.lhs + "/" + ratio.rhs); + responseContext.addSamplingComposition(ratio.lhs + "/" + ratio.rhs); } @Nullable final byte[] queryCacheKey = cacheKeyManager.computeSegmentLevelQueryCacheKey(); @@ -512,7 +510,8 @@ private void computeUncoveredIntervals(TimelineLookup ti } } - private Pair pruneSegmentsForShardSampling(final Set segments) { + private Pair pruneSegmentsForShardSampling(final Set segments) + { if (query.getDataSource() instanceof SampledTableDataSource) { if (((SampledTableDataSource) query.getDataSource()).getSamplingType() == SamplingType.FIXED_SHARD) { From c2bbaa65009d031653c35ae8e137051c2669f104 Mon Sep 17 00:00:00 2001 From: takaaki7 Date: Thu, 12 Oct 2023 20:10:32 +0900 Subject: [PATCH 4/4] fix table sampling --- .../java/org/apache/druid/client/CachingClusteredClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 590a29e4a8d4..519296e388aa 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -519,7 +519,7 @@ private Pair pruneSegmentsForShardSampling(final Set s.getSegmentDescriptor().getPartitionNumber()).max().getAsInt(); int targetShards = Math.round( - allShards * ((SampledTableDataSource) query.getDataSource()).getSamplingPercentage()); + allShards * ((SampledTableDataSource) query.getDataSource()).getSamplingPercentage()) / 100; Iterator iterator = segments.iterator(); int removedSegments = 0; while (iterator.hasNext()) {