Skip to content

Commit

Permalink
Merge branch 'dynamic-config-api-doc-refactor' of https://github.com/…
Browse files Browse the repository at this point in the history
…ektravel/druid into dynamic-config-api-doc-refactor
  • Loading branch information
ektravel committed Oct 5, 2023
2 parents 7ec1036 + a2d9284 commit 9e34b07
Show file tree
Hide file tree
Showing 121 changed files with 5,381 additions and 1,400 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/reusable-standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
- name: Collect service logs on failure
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
run: |
tar cvzf ./service-logs.tgz ./shared/logs
tar cvzf ./service-logs.tgz ~/shared/logs
- name: Upload Druid service logs to GitHub
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.benchmark;

import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.utils.JvmUtils;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -64,15 +65,31 @@ public void checkEvent1(Map<String, Object> event)
Assert.assertEquals("129047958", event.get("e2.ad1[0]").toString());
Assert.assertEquals("1658972185", event.get("e2.ad1[1]").toString());
Assert.assertEquals("-997010830", event.get("e2.ad1[2]").toString());
Assert.assertEquals("-5.8772014847368817E18", event.get("e3.m1").toString());

// Java 19 changes some floating point string representation
// https://bugs.openjdk.org/browse/JDK-8291475
if (JvmUtils.majorVersion() < 19) {
Assert.assertEquals("-5.8772014847368817E18", event.get("e3.m1").toString());
} else {
Assert.assertEquals("-5.877201484736882E18", event.get("e3.m1").toString());
}

Assert.assertEquals("0.4375433369079904", event.get("e3.m2").toString());
Assert.assertEquals("0.8510482953607659", event.get("e3.m3").toString());
Assert.assertEquals("-2.3832626488759337E18", event.get("e3.m4").toString());
Assert.assertEquals("7.9789762132607068E18", event.get("e3.am1[0]").toString());
Assert.assertEquals("-7.8634787235005573E18", event.get("e3.am1[1]").toString());
Assert.assertEquals("8.7372945568982446E18", event.get("e3.am1[2]").toString());
Assert.assertEquals("3.1928124802414899E18", event.get("e3.am1[3]").toString());
Assert.assertEquals("-3.9806631713718011E18", event.get("e4.e4.m4").toString());
if (JvmUtils.majorVersion() < 19) {
Assert.assertEquals("7.9789762132607068E18", event.get("e3.am1[0]").toString());
Assert.assertEquals("-7.8634787235005573E18", event.get("e3.am1[1]").toString());
Assert.assertEquals("8.7372945568982446E18", event.get("e3.am1[2]").toString());
Assert.assertEquals("3.1928124802414899E18", event.get("e3.am1[3]").toString());
Assert.assertEquals("-3.9806631713718011E18", event.get("e4.e4.m4").toString());
} else {
Assert.assertEquals("7.978976213260707E18", event.get("e3.am1[0]").toString());
Assert.assertEquals("-7.863478723500557E18", event.get("e3.am1[1]").toString());
Assert.assertEquals("8.737294556898245E18", event.get("e3.am1[2]").toString());
Assert.assertEquals("3.19281248024149E18", event.get("e3.am1[3]").toString());
Assert.assertEquals("-3.980663171371801E18", event.get("e4.e4.m4").toString());
}
Assert.assertEquals("-1915243040", event.get("ae1[0].d1").toString());
Assert.assertEquals("-2020543641", event.get("ae1[1].d1").toString());
Assert.assertEquals("1414285347", event.get("ae1[2].e1.d2").toString());
Expand Down
1 change: 0 additions & 1 deletion extensions-contrib/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -54,7 +55,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public class MaterializedViewQueryQueryToolChestTest
public class MaterializedViewQueryQueryToolChestTest extends InitializedNullHandlingTest
{
static {
NullHandling.initializeForTests();
Expand Down
5 changes: 3 additions & 2 deletions extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-${mockito.inline.artifact}</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<!-- explicitly declare mockito-core dependency to make anaylize-dependencies happy when running with Java 8 -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
7 changes: 7 additions & 0 deletions extensions-core/druid-pac4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
<groupId>org.pac4j</groupId>
<artifactId>pac4j-oidc</artifactId>
<version>${pac4j.version}</version>
<exclusions>
<!-- pac4j-oidc erroneously declares mockito as a compile time instead of test dependency -->
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
6 changes: 4 additions & 2 deletions extensions-core/lookups-cached-global/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<!-- explicitly declare mockito-core dependency to make anaylize-dependencies happy when running with Java 8 -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-${mockito.inline.artifact}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ public void resultsComplete(
try {
convertedResultObject = context.jsonMapper().convertValue(
resultObject,
queryKernel.getStageDefinition(stageId).getProcessorFactory().getAccumulatedResultTypeReference()
queryKernel.getStageDefinition(stageId).getProcessorFactory().getResultTypeReference()
);
}
catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.druid.frame.processor.PartitionedOutputChannel;
import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.frame.processor.SuperSorterProgressTracker;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -71,8 +73,6 @@
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterNames;
Expand Down Expand Up @@ -1100,11 +1100,12 @@ private void makeInputSliceReader()
.put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir()))
.put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler()))
.put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler()))
.put(SegmentsInputSlice.class,
new SegmentsInputSliceReader(
frameContext.dataSegmentProvider(),
MultiStageQueryContext.isReindex(QueryContext.of(task().getContext()))
)
.put(
SegmentsInputSlice.class,
new SegmentsInputSliceReader(
frameContext.dataSegmentProvider(),
MultiStageQueryContext.isReindex(QueryContext.of(task().getContext()))
)
)
.build()
);
Expand Down Expand Up @@ -1152,7 +1153,16 @@ private void makeShuffleOutputChannelFactory(boolean isFinalStage)
);
}

private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, WorkerClass extends FrameProcessor<T>, T, R> void makeAndRunWorkProcessors()
/**
* Use {@link FrameProcessorFactory#makeProcessors} to create {@link ProcessorsAndChannels}. Executes the
* processors using {@link #exec} and sets the output channels in {@link #workResultAndOutputChannels}.
*
* @param <FactoryType> type of {@link StageDefinition#getProcessorFactory()}
* @param <ProcessorReturnType> return type of {@link FrameProcessor} created by the manager
* @param <ManagerReturnType> result type of {@link ProcessorManager#result()}
* @param <ExtraInfoType> type of {@link WorkOrder#getExtraInfo()}
*/
private <FactoryType extends FrameProcessorFactory<ProcessorReturnType, ManagerReturnType, ExtraInfoType>, ProcessorReturnType, ManagerReturnType, ExtraInfoType> void makeAndRunWorkProcessors()
throws IOException
{
if (workResultAndOutputChannels != null) {
Expand All @@ -1163,21 +1173,21 @@ private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, Wor
final FactoryType processorFactory = (FactoryType) kernel.getStageDefinition().getProcessorFactory();

@SuppressWarnings("unchecked")
final ProcessorsAndChannels<WorkerClass, T> processors =
final ProcessorsAndChannels<ProcessorReturnType, ManagerReturnType> processors =
processorFactory.makeProcessors(
kernel.getStageDefinition(),
kernel.getWorkOrder().getWorkerNumber(),
kernel.getWorkOrder().getInputs(),
inputSliceReader,
(I) kernel.getWorkOrder().getExtraInfo(),
(ExtraInfoType) kernel.getWorkOrder().getExtraInfo(),
workOutputChannelFactory,
frameContext,
parallelism,
counterTracker,
e -> warningPublisher.publishException(kernel.getStageDefinition().getStageNumber(), e)
);

final Sequence<WorkerClass> processorSequence = processors.processors();
final ProcessorManager<ProcessorReturnType, ManagerReturnType> processorManager = processors.getProcessorManager();

final int maxOutstandingProcessors;

Expand All @@ -1190,10 +1200,8 @@ private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, Wor
Math.max(1, Math.min(parallelism, processors.getOutputChannels().getAllChannels().size()));
}

final ListenableFuture<R> workResultFuture = exec.runAllFully(
processorSequence,
processorFactory.newAccumulatedResult(),
processorFactory::accumulateResult,
final ListenableFuture<ManagerReturnType> workResultFuture = exec.runAllFully(
processorManager,
maxOutstandingProcessors,
processorBouncer,
cancellationId
Expand Down Expand Up @@ -1716,11 +1724,13 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan

final ListenableFuture<ClusterByStatisticsCollector> clusterByStatisticsCollectorFuture =
exec.runAllFully(
Sequences.simple(processors),
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
),
ClusterByStatisticsCollector::addAll,
ProcessorManagers.of(processors)
.withAccumulation(
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
),
ClusterByStatisticsCollector::addAll
),
// Run all processors simultaneously. They are lightweight and this keeps things moving.
processors.size(),
Bouncer.unlimited(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
Expand Down Expand Up @@ -130,11 +131,11 @@ public class WorkerMemoryParameters
private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 256_000_000;

/**
* Fraction of free memory per bundle that can be used by {@link org.apache.druid.msq.querykit.BroadcastJoinHelper}
* to store broadcast data on-heap. This is used to limit the total size of input frames, which we expect to
* expand on-heap. Expansion can potentially be somewhat over 2x: for example, strings are UTF-8 in frames, but are
* UTF-16 on-heap, which is a 2x expansion, and object and index overhead must be considered on top of that. So
* we use a value somewhat lower than 0.5.
* Fraction of free memory per bundle that can be used by {@link BroadcastJoinSegmentMapFnProcessor} to store broadcast
* data on-heap. This is used to limit the total size of input frames, which we expect to expand on-heap. Expansion
* can potentially be somewhat over 2x: for example, strings are UTF-8 in frames, but are UTF-16 on-heap, which is
* a 2x expansion, and object and index overhead must be considered on top of that. So we use a value somewhat
* lower than 0.5.
*/
static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.Iterables;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -75,7 +76,7 @@

@JsonTypeName("segmentGenerator")
public class SegmentGeneratorFrameProcessorFactory
implements FrameProcessorFactory<List<SegmentIdWithShardSpec>, SegmentGeneratorFrameProcessor, DataSegment, Set<DataSegment>>
implements FrameProcessorFactory<DataSegment, Set<DataSegment>, List<SegmentIdWithShardSpec>>
{
private final DataSchema dataSchema;
private final ColumnMappings columnMappings;
Expand Down Expand Up @@ -112,7 +113,7 @@ public MSQTuningConfig getTuningConfig()
}

@Override
public ProcessorsAndChannels<SegmentGeneratorFrameProcessor, DataSegment> makeProcessors(
public ProcessorsAndChannels<DataSegment, Set<DataSegment>> makeProcessors(
StageDefinition stageDefinition,
int workerNumber,
List<InputSlice> inputSlices,
Expand Down Expand Up @@ -151,7 +152,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)
}
));
final SegmentGenerationProgressCounter segmentGenerationProgressCounter = counters.segmentGenerationProgress();
final SegmentGeneratorMetricsWrapper segmentGeneratorMetricsWrapper = new SegmentGeneratorMetricsWrapper(segmentGenerationProgressCounter);
final SegmentGeneratorMetricsWrapper segmentGeneratorMetricsWrapper =
new SegmentGeneratorMetricsWrapper(segmentGenerationProgressCounter);

final Sequence<SegmentGeneratorFrameProcessor> workers = inputSequence.map(
readableInputPair -> {
Expand Down Expand Up @@ -196,32 +198,28 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)
}
);

return new ProcessorsAndChannels<>(workers, OutputChannels.none());
return new ProcessorsAndChannels<>(
ProcessorManagers.of(workers)
.withAccumulation(
new HashSet<>(),
(acc, segment) -> {
if (segment != null) {
acc.add(segment);
}

return acc;
}
),
OutputChannels.none()
);
}

@Override
public TypeReference<Set<DataSegment>> getAccumulatedResultTypeReference()
public TypeReference<Set<DataSegment>> getResultTypeReference()
{
return new TypeReference<Set<DataSegment>>() {};
}

@Override
public Set<DataSegment> newAccumulatedResult()
{
return new HashSet<>();
}

@Nullable
@Override
public Set<DataSegment> accumulateResult(Set<DataSegment> accumulated, DataSegment current)
{
if (current != null) {
accumulated.add(current);
}

return accumulated;
}

@Nullable
@Override
public Set<DataSegment> mergeAccumulatedResult(Set<DataSegment> accumulated, Set<DataSegment> otherAccumulated)
Expand Down
Loading

0 comments on commit 9e34b07

Please sign in to comment.