Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Nov 29, 2023
1 parent a4f476d commit fe32796
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.Segment;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand All @@ -40,6 +43,12 @@
/**
* This class contains resources required for a groupBy query execution.
* Currently, it contains only merge buffers, but any additional resources can be added in the future.
*
* It contains merge buffers for the execution of
* a) {@link GroupByQueryQueryToolChest#mergeResults(QueryRunner)} - Required for merging the results of the subqueries
* and the subtotals.
* b) {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2} - Required for merging the results
* of the individual runners created by {@link GroupByQueryRunnerFactory#createRunner(Segment)}
*/
public class GroupByQueryResources implements Closeable
{
Expand Down Expand Up @@ -113,7 +122,6 @@ public static int countRequiredMergeBufferNumForMergingQueryRunner(GroupByQueryC

private final Deque<ByteBuffer> mergingQueryRunnerMergeBuffers = new ArrayDeque<>();

// TODO(laksh): Donot modify the structure of the list
public GroupByQueryResources(
@Nullable List<ReferenceCountingResourceHolder<ByteBuffer>> toolchestMergeBuffersHolders,
@Nullable List<ReferenceCountingResourceHolder<ByteBuffer>> mergingQueryRunnerMergeBuffersHolders
Expand All @@ -129,32 +137,40 @@ public GroupByQueryResources(
}
}

/**
* Returns a merge buffer associated with the toolchest merge
*/
public ResourceHolder<ByteBuffer> getToolchestMergeBuffer()
{
return getMergeBuffer(toolchestMergeBuffers);
}

/**
* Returns a merge buffer associated with the merging query runner's merge buffer
*/
public ResourceHolder<ByteBuffer> getMergingQueryRunnerMergeBuffer()
{
return getMergeBuffer(mergingQueryRunnerMergeBuffers);
}

/**
* Returns the number of the currently present merging query runner merge buffers present
*/
public int getNumMergingQueryRunnerMergeBuffers()
{
// Should be same as the holders size
return mergingQueryRunnerMergeBuffers.size();
}

/**
* Get a merge buffer from the pre-acquired resources.
*
* @return a resource holder containing a merge buffer
*
* @throws IllegalStateException if this resource is initialized with empty merge buffers, or
* there isn't any available merge buffers
*/
private static ResourceHolder<ByteBuffer> getMergeBuffer(Deque<ByteBuffer> acquiredBufferPool)
{
if (acquiredBufferPool.size() == 0) {
throw DruidException.defensive("Insufficient free merge buffers present.");
}
final ByteBuffer buffer = acquiredBufferPool.pop();
return new ResourceHolder<ByteBuffer>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public QueryRunner<ResultRow> createRunner(final Segment segment)
}

/**
* TODO(laksh): Assumes that the caller is passing in the merge buffers in the response context
* @see GroupingEngine#mergeRunners(QueryProcessingPool, Iterable)
*/
@Override
public QueryRunner<ResultRow> mergeRunners(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,13 @@ public Sequence<ResultRow> mergeResults(
* {@link GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)} (the runners created
* by that method will be fed into this method).
* <p>
* This method is called directly on the data. For
*
* This is primarily called on the data servers, to merge the results from processing on the segments. This method can
* also be called on the brokers if the query is operating on the local data sources, like the inline
* datasources.
*
* It uses {@link GroupByMergingQueryRunnerV2} which requires the merge buffers to be passed in the responseContext
* of the query that is run.
*
* @param queryProcessingPool {@link QueryProcessingPool} service used for parallel execution of the query runners
* @param queryRunners collection of query runners to merge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
)
{
GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS);
// TODO(laksh): Add NPE check
if (resource == null) {
throw DruidException.defensive("Expected merge buffers to be passed in the response context while executing the "
+ "GroupByMergingQueryRunnerV2, however none were provided.");
}
if (numBuffers > resource.getNumMergingQueryRunnerMergeBuffers()) {
// Defensive exception, because we should have acquired the correct number of merge buffers beforehand, or
// thrown an RLE in the caller of the runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,9 @@ private void setupGroupByFactory()

final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
final TestBufferPool bufferPool2 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
final TestBufferPool bufferPool3 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);

// TODO(laksh)
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
// TODO(laksh)
final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 2);
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 1);
final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 1);

resourceCloser.register(() -> {
// Verify that all objects have been returned to the pools.
Expand Down

0 comments on commit fe32796

Please sign in to comment.