Skip to content

Commit

Permalink
test cases fix 2
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Nov 27, 2023
1 parent 35bf551 commit 09689e9
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public <RowType, QueryType> Pair<DataServerQueryStatus, Yielder<RowType>> fetchR
final int numRetriesOnMissingSegments = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);

log.debug("Querying severs[%s] for segment[%s], retries:[%d]", servers, segmentDescriptor, numRetriesOnMissingSegments);
final ResponseContext responseContext = new DefaultResponseContext();
final ResponseContext responseContext = DefaultResponseContext.createEmpty();

Pair<DataServerQueryStatus, Yielder<RowType>> statusSequencePair;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.guice.annotations.PublicApi;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -30,16 +31,34 @@
@PublicApi
public class ConcurrentResponseContext extends ResponseContext
{

private final ConcurrentHashMap<Key, Object> delegate;

private ConcurrentResponseContext()
{
this(Collections.emptyMap());
}

private ConcurrentResponseContext(final Map<Key, Object> delegate)
{
this.delegate = new ConcurrentHashMap<>(delegate);
}

public static ConcurrentResponseContext createEmpty()
{
return new ConcurrentResponseContext();
}

private final ConcurrentHashMap<Key, Object> delegate = new ConcurrentHashMap<>();

@Override
protected Map<Key, Object> getDelegate()
{
return delegate;
}

@Override
public ResponseContext clone()
{
return new ConcurrentResponseContext(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.guice.annotations.PublicApi;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -30,16 +31,33 @@
@PublicApi
public class DefaultResponseContext extends ResponseContext
{

private final HashMap<Key, Object> delegate;

private DefaultResponseContext()
{
this(Collections.emptyMap());
}

private DefaultResponseContext(final Map<Key, Object> delegate)
{
this.delegate = new HashMap<>(delegate);
}

public static DefaultResponseContext createEmpty()
{
return new DefaultResponseContext();
}

private final HashMap<Key, Object> delegate = new HashMap<>();

@Override
protected Map<Key, Object> getDelegate()
{
return delegate;
}

@Override
public ResponseContext clone()
{
return new DefaultResponseContext(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,9 @@ public Key find(String name)

protected abstract Map<Key, Object> getDelegate();

@Override
public abstract ResponseContext clone();

public Map<String, Object> toMap()
{
return CollectionUtils.mapKeys(getDelegate(), k -> k.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private Sequence<ResultRow> initAndMergeGroupByResults(
{
// .. 1. Historicals, Broker -> Which is using localWalker
// MerginV2 ->
ResponseContext clonedContext = context.clone();
Boolean usesGroupByMergingQueryRunner = (Boolean) query
.getContext()
.getOrDefault(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true);
Expand All @@ -171,18 +172,20 @@ private Sequence<ResultRow> initAndMergeGroupByResults(
queryConfig
);
if (usesGroupByMergingQueryRunner) {
context.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, resource);
clonedContext.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, resource);
}
try {
final Sequence<ResultRow> mergedSequence = mergeGroupByResults(
query,
resource,
runner,
context
clonedContext
);
Closer closer = Closer.create();
closer.register(resource);
closer.register(() -> context.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS));
closer.register(() ->
clonedContext.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS)
);
return Sequences.withBaggage(mergedSequence, closer);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
throw DruidException.defensive(
"Query needs %d merge buffers for GroupByMergingQueryRunnerV2, however only %d were provided.",
numBuffers,
resource.getMergingQueryRunnerMergeBuffer()
resource.getNumMergingQueryRunnerMergeBuffers()
);
}
final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolders = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ protected Map<Key, Object> getDelegate()
{
return ImmutableMap.of(UNREGISTERED_KEY, "non-registered-key");
}

@Override
public ResponseContext clone()
{
return this;
}
};
ResponseContext.createEmpty().merge(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -259,9 +260,9 @@ private void setupGroupByFactory()

final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
// TODO(laksh)
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
// TODO(laksh)
final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 2);

resourceCloser.register(() -> {
Expand Down Expand Up @@ -403,8 +404,22 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
return Sequences
.simple(
ImmutableList.of(
theRunner.run(queryPlus, responseContext),
theRunner2.run(queryPlus, responseContext)
theRunner.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
),
theRunner2.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
)
)
)
.flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering());
Expand Down Expand Up @@ -433,7 +448,12 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
.setGranularity(Granularities.ALL)
.build();

Sequence<ResultRow> queryResult = theRunner3.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
Sequence<ResultRow> queryResult = theRunner3.run(
QueryPlus.wrap(query.withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false)
)),
ResponseContext.createEmpty()
);
List<ResultRow> results = queryResult.toList();

ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
Expand Down Expand Up @@ -492,8 +512,22 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
return Sequences
.simple(
ImmutableList.of(
theRunner.run(queryPlus, responseContext),
theRunner2.run(queryPlus, responseContext)
theRunner.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
),
theRunner2.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
)
)
)
.flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering());
Expand Down Expand Up @@ -530,7 +564,12 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
)
.build();

Sequence<ResultRow> queryResult = theRunner3.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
Sequence<ResultRow> queryResult = theRunner3.run(
QueryPlus.wrap(query.withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false)
)),
ResponseContext.createEmpty()
);
List<ResultRow> results = queryResult.toList();

ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow(
Expand Down
Loading

0 comments on commit 09689e9

Please sign in to comment.