Skip to content

Commit

Permalink
Adjust how accumulation works.
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Sep 19, 2023
1 parent 4679366 commit c1600e8
Show file tree
Hide file tree
Showing 25 changed files with 269 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ public void resultsComplete(
try {
convertedResultObject = context.jsonMapper().convertValue(
resultObject,
queryKernel.getStageDefinition(stageId).getProcessorFactory().getAccumulatedResultTypeReference()
queryKernel.getStageDefinition(stageId).getProcessorFactory().getResultTypeReference()
);
}
catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ private void makeShuffleOutputChannelFactory(boolean isFinalStage)
);
}

private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, WorkerClass extends FrameProcessor<T>, T, R> void makeAndRunWorkProcessors()
private <FactoryType extends FrameProcessorFactory<T, R, I>, T, R, I> void makeAndRunWorkProcessors()
throws IOException
{
if (workResultAndOutputChannels != null) {
Expand All @@ -1163,7 +1163,7 @@ private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, Wor
final FactoryType processorFactory = (FactoryType) kernel.getStageDefinition().getProcessorFactory();

@SuppressWarnings("unchecked")
final ProcessorsAndChannels<WorkerClass, T> processors =
final ProcessorsAndChannels<T, R> processors =
processorFactory.makeProcessors(
kernel.getStageDefinition(),
kernel.getWorkOrder().getWorkerNumber(),
Expand All @@ -1177,7 +1177,7 @@ private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, Wor
e -> warningPublisher.publishException(kernel.getStageDefinition().getStageNumber(), e)
);

final ProcessorManager<T> processorManager = processors.processors();
final ProcessorManager<T, R> processorManager = processors.getProcessorManager();

final int maxOutstandingProcessors;

Expand All @@ -1192,8 +1192,6 @@ private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, Wor

final ListenableFuture<R> workResultFuture = exec.runAllFully(
processorManager,
processorFactory.newAccumulatedResult(),
processorFactory::accumulateResult,
maxOutstandingProcessors,
processorBouncer,
cancellationId
Expand Down Expand Up @@ -1716,11 +1714,13 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan

final ListenableFuture<ClusterByStatisticsCollector> clusterByStatisticsCollectorFuture =
exec.runAllFully(
ProcessorManagers.of(processors),
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
),
ClusterByStatisticsCollector::addAll,
ProcessorManagers.of(processors)
.withAccumulation(
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
),
ClusterByStatisticsCollector::addAll
),
// Run all processors simultaneously. They are lightweight and this keeps things moving.
processors.size(),
Bouncer.unlimited(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

@JsonTypeName("segmentGenerator")
public class SegmentGeneratorFrameProcessorFactory
implements FrameProcessorFactory<List<SegmentIdWithShardSpec>, SegmentGeneratorFrameProcessor, DataSegment, Set<DataSegment>>
implements FrameProcessorFactory<DataSegment, Set<DataSegment>, List<SegmentIdWithShardSpec>>
{
private final DataSchema dataSchema;
private final ColumnMappings columnMappings;
Expand Down Expand Up @@ -113,7 +113,7 @@ public MSQTuningConfig getTuningConfig()
}

@Override
public ProcessorsAndChannels<SegmentGeneratorFrameProcessor, DataSegment> makeProcessors(
public ProcessorsAndChannels<DataSegment, Set<DataSegment>> makeProcessors(
StageDefinition stageDefinition,
int workerNumber,
List<InputSlice> inputSlices,
Expand Down Expand Up @@ -152,7 +152,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)
}
));
final SegmentGenerationProgressCounter segmentGenerationProgressCounter = counters.segmentGenerationProgress();
final SegmentGeneratorMetricsWrapper segmentGeneratorMetricsWrapper = new SegmentGeneratorMetricsWrapper(segmentGenerationProgressCounter);
final SegmentGeneratorMetricsWrapper segmentGeneratorMetricsWrapper =
new SegmentGeneratorMetricsWrapper(segmentGenerationProgressCounter);

final Sequence<SegmentGeneratorFrameProcessor> workers = inputSequence.map(
readableInputPair -> {
Expand Down Expand Up @@ -197,32 +198,28 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)
}
);

return new ProcessorsAndChannels<>(ProcessorManagers.of(workers), OutputChannels.none());
return new ProcessorsAndChannels<>(
ProcessorManagers.of(workers)
.withAccumulation(
new HashSet<>(),
(acc, segment) -> {
if (segment != null) {
acc.add(segment);
}

return acc;
}
),
OutputChannels.none()
);
}

@Override
public TypeReference<Set<DataSegment>> getAccumulatedResultTypeReference()
public TypeReference<Set<DataSegment>> getResultTypeReference()
{
return new TypeReference<Set<DataSegment>>() {};
}

@Override
public Set<DataSegment> newAccumulatedResult()
{
return new HashSet<>();
}

@Nullable
@Override
public Set<DataSegment> accumulateResult(Set<DataSegment> accumulated, DataSegment current)
{
if (current != null) {
accumulated.add(current);
}

return accumulated;
}

@Nullable
@Override
public Set<DataSegment> mergeAccumulatedResult(Set<DataSegment> accumulated, Set<DataSegment> otherAccumulated)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
Expand All @@ -36,18 +36,17 @@
* Property of {@link StageDefinition} that describes its computation logic.
*
* Workers call {@link #makeProcessors} to generate the processors that perform computations within that worker's
* {@link org.apache.druid.frame.processor.FrameProcessorExecutor}. Additionally, provides methods for accumulating
* the results of the processors: {@link #newAccumulatedResult()}, {@link #accumulateResult}, and
* {@link #mergeAccumulatedResult}.
* {@link org.apache.druid.frame.processor.FrameProcessorExecutor}. Additionally, provides
* {@link #mergeAccumulatedResult(Object, Object)} for merging results from {@link ProcessorManager#result()}.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface FrameProcessorFactory<ExtraInfoType, ProcessorType extends FrameProcessor<T>, T, R>
public interface FrameProcessorFactory<T, R, ExtraInfoType>
{
/**
* Create processors for a particular worker in a particular stage. The processors will be run on a thread pool,
* with at most "maxOutstandingProcessors" number of processors outstanding at once.
*
* The Sequence returned by {@link ProcessorsAndChannels#processors()} is passed directly to
* The Sequence returned by {@link ProcessorsAndChannels#getProcessorManager()} is passed directly to
* {@link org.apache.druid.frame.processor.FrameProcessorExecutor#runAllFully}.
*
* @param stageDefinition stage definition
Expand All @@ -65,7 +64,7 @@ public interface FrameProcessorFactory<ExtraInfoType, ProcessorType extends Fram
*
* @return a processor sequence, which may be computed lazily; and a list of output channels.
*/
ProcessorsAndChannels<ProcessorType, T> makeProcessors(
ProcessorsAndChannels<T, R> makeProcessors(
StageDefinition stageDefinition,
int workerNumber,
List<InputSlice> inputSlices,
Expand All @@ -78,18 +77,8 @@ ProcessorsAndChannels<ProcessorType, T> makeProcessors(
Consumer<Throwable> warningPublisher
) throws IOException;

TypeReference<R> getAccumulatedResultTypeReference();

/**
* Produces a "blank slate" result.
*/
R newAccumulatedResult();

/**
* Accumulates an additional result. May modify the left-hand side {@code accumulated}. Does not modify the
* right-hand side {@code current}.
*/
R accumulateResult(R accumulated, T current);
@Nullable
TypeReference<R> getResultTypeReference();

/**
* Merges two accumulated results. May modify the left-hand side {@code accumulated}. Does not modify the right-hand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.msq.kernel;

import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.processor.manager.ProcessorManager;

Expand All @@ -28,23 +27,23 @@
*
* Includes a processor sequence and a list of output channels.
*/
public class ProcessorsAndChannels<ProcessorClass extends FrameProcessor<T>, T>
public class ProcessorsAndChannels<T, R>
{
private final ProcessorManager<T> processors;
private final ProcessorManager<T, R> processorManager;
private final OutputChannels outputChannels;

public ProcessorsAndChannels(
final ProcessorManager<T> processors,
final ProcessorManager<T, R> processorManager,
final OutputChannels outputChannels
)
{
this.processors = processors;
this.processorManager = processorManager;
this.outputChannels = outputChannels;
}

public ProcessorManager<T> processors()
public ProcessorManager<T, R> getProcessorManager()
{
return processors;
return processorManager;
}

public OutputChannels getOutputChannels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.msq.kernel.ExtraInfoHolder;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
Expand All @@ -33,39 +31,23 @@
* Basic abstract {@link FrameProcessorFactory} that yields workers that do not require extra info and that
* ignore the return values of their processors. This base class isn't used for every worker factory, but it is used
* for many of them.
*
* The return value is always {@link Unit#instance()}.
*/
public abstract class BaseFrameProcessorFactory
implements FrameProcessorFactory<Unit, FrameProcessor<Object>, Object, Object>
public abstract class BaseFrameProcessorFactory implements FrameProcessorFactory<Object, Long, Object>
{
@Override
public TypeReference<Object> getAccumulatedResultTypeReference()
{
return (TypeReference) new TypeReference<Unit>() {};
}

@Override
public Object newAccumulatedResult()
{
return Unit.instance();
}

@Nullable
@Override
public Object accumulateResult(Object accumulated, Object current)
public TypeReference<Long> getResultTypeReference()
{
return accumulated;
return new TypeReference<Long>() {};
}

@Override
public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated)
public Long mergeAccumulatedResult(Long accumulated, Long otherAccumulated)
{
return accumulated;
return accumulated + otherAccumulated;
}

@Override
public ExtraInfoHolder<?> makeExtraInfoHolder(@Nullable Unit extra)
public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra)
{
if (extra != null) {
throw new ISE("Expected null 'extra'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.List;
import java.util.function.Function;

public abstract class BaseLeafFrameProcessor implements FrameProcessor<Unit>
public abstract class BaseLeafFrameProcessor implements FrameProcessor<Object>
{
private final ReadableInput baseInput;
private final ResourceHolder<WritableFrameChannel> outputChannelHolder;
Expand Down Expand Up @@ -77,13 +77,18 @@ public List<WritableFrameChannel> outputChannels()
}

@Override
public ReturnOrAwait<Unit> runIncrementally(final IntSet readableInputs) throws IOException
public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs) throws IOException
{
final ReturnOrAwait<Unit> retVal;

if (baseInput.hasSegment()) {
return runWithSegment(baseInput.getSegment());
retVal = runWithSegment(baseInput.getSegment());
} else {
return runWithInputChannel(baseInput.getChannel(), baseInput.getChannelFrameReader());
retVal = runWithInputChannel(baseInput.getChannel(), baseInput.getChannelFrameReader());
}

//noinspection rawtypes,unchecked
return (ReturnOrAwait) retVal;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
Expand Down Expand Up @@ -74,12 +73,12 @@ protected BaseLeafFrameProcessorFactory(Query<?> query)
}

@Override
public ProcessorsAndChannels<FrameProcessor<Object>, Object> makeProcessors(
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
int workerNumber,
List<InputSlice> inputSlices,
InputSliceReader inputSliceReader,
@Nullable Unit extra,
@Nullable Object extra,
OutputChannelFactory outputChannelFactory,
FrameContext frameContext,
int maxOutstandingProcessors,
Expand Down Expand Up @@ -147,7 +146,7 @@ public ProcessorsAndChannels<FrameProcessor<Object>, Object> makeProcessors(
);

// Function to generate a processor manger for the regular processors, which run after the segmentMapFnProcessor.
final Function<Function<SegmentReference, SegmentReference>, ProcessorManager<Unit>> processorManagerFn =
final Function<Function<SegmentReference, SegmentReference>, ProcessorManager<Object, Long>> processorManagerFn =
segmentMapFn ->
new BaseLeafFrameProcessorManager(
processorBaseInputs,
Expand All @@ -173,7 +172,7 @@ public ProcessorsAndChannels<FrameProcessor<Object>, Object> makeProcessors(
return new ProcessorsAndChannels<>(processorManager, OutputChannels.wrapReadOnly(outputChannels));
}

protected abstract FrameProcessor<Unit> makeProcessor(
protected abstract FrameProcessor<Object> makeProcessor(
ReadableInput baseInput,
Function<SegmentReference, SegmentReference> segmentMapFn,
ResourceHolder<WritableFrameChannel> outputChannelHolder,
Expand Down
Loading

0 comments on commit c1600e8

Please sign in to comment.