Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12304: Parallelize ORDER BY row materialization/validation #1489

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

michaeljmarshall
Copy link
Member

What is the issue

Fixes https://github.com/riptano/cndb/issues/12304

What does this PR fix and why was it fixed

See https://github.com/riptano/cndb/issues/12304 for details. The summary is that this parallelizes row materialization to ensure that we utilize all as much resources as possible when materializing large LIMIT requests.

In order to make this work, I needed to make a version of RefViewFragment that is pre-sorted so that it can be shared by multiple threads at the same time.

There are several ways we could parallelize row materialization. This PR iterates over the following steps until the LIMIT is reached or there are no more rows satisfying the query:

  1. Pull the greater of LIMIT - returnedRowCount or 1 PrimaryKeys in from the SAI source iterator.
  2. Dispatch asynchronous requests to materialize the rows for each primary key from step 1.
  3. Validate the result on the PARALLEL_EXECUTOR thread to simplify the logic associated with ensuring we close all partitions.

Optimizations considered not implemented:

  • merging primary keys with the same partition key into a single materialization step
  • iteratively dispatching futures as rows materialize and are found to not satisfy query predicate validation (instead, we batch requests)
  • storing a cached value of materialized rows that were not valid (this could in cases of many false positives, with the cost of additional memory consumption)

Checklist before you submit for review

  • Make sure there is a PR in the CNDB project updating the Converged Cassandra version
  • Use NoSpamLogger for log lines that may appear frequently in the logs
  • Verify test results on Butler
  • Test coverage for new/modified code is > 80%
  • Proper code formatting
  • Proper title for each commit staring with the project-issue number, like CNDB-1234
  • Each commit has a meaningful description
  • Each commit is not very long and contains related changes
  • Renames, moves and reformatting are in distinct commits

@cassci-bot
Copy link

❌ Build ds-cassandra-pr-gate/PR-1489 rejected by Butler


1 new test failure(s) in 2 builds
See build details here


Found 1 new test failures

Test Explanation Branch history Upstream history
o.a.c.u.b.BinLogTest.testTruncationReleasesLogS... regression 🔴🔵 🔵🔵🔵🔵🔵🔵🔵

Found 5834 known test failures

Copy link

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

But I don't know this code well enough to approve.

@pkolaczk / @adelapena / @jbellis

import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.RangeUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.tracing.Tracing;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unneeded import


/**
* Executor to use for parallel index reads.
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.
* Defined by -Dcassandra.index_read.parallel=true/false, true by default.

return SharedExecutorPool.SHARED.newExecutor(numThreads, maximumPoolSize -> {}, "request", "IndexParallelRead");
}
else
return ImmediateExecutor.INSTANCE;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if we are to follow ASF style rules (we might not), use brackets if the other branch has brackets:

Suggested change
return ImmediateExecutor.INSTANCE;
{
return ImmediateExecutor.INSTANCE;
}

* {@link #nextSelectedKeyInRange()}. We map PrimaryKey to List<PrimaryKeyWithSortKey> because the same
* primary key can be in the result set multiple times, but with different source tables.
* @param keys the map to fill
* Consumes the next `count` unique primary keys produced {@link #nextSelectedKeyInRange()}, then materializes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Consumes the next `count` unique primary keys produced {@link #nextSelectedKeyInRange()}, then materializes
* Consumes the next {@code count} unique primary keys produced by {@link #nextSelectedKeyInRange()}, then materializes

return;
try (var partition = controller.getPartition(primaryKey, view, executionController))
{
// Validates that the parition's row is valid for the query predicates. It returns null

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Validates that the parition's row is valid for the query predicates. It returns null
// Validates that the partition's row is valid for the query predicates. It returns null

// if the row is not valid. Reasons for invalidity include:
// 1. The row is a range or row tombstone or is expired.
// 2. The row does not satisfy the query predicates
// 3. The row does not satisfy the ORDER BY clause

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a row not satisfy the ORDER BY clause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarify it here and in the comment. Essentially, because of updates, we have to check that the source sstable for the materialized cell is the same sstable as the source SAI index's sstable. Otherwise, we can have out of order rows due to updates to a worse position in the iterator of ordered rows.

var now = FBUtilities.nowInSeconds();
boolean isRowValid = false;
var row = clusters.next();
assert !clusters.hasNext() : "Expected only one row per partition";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this method could be named validateSingleRowPartition for more clarity?

Comment on lines +703 to +713
private static class PrimaryKeyResult
{
final PrimaryKey primaryKey;
final List<PrimaryKeyWithSortKey> primaryKeyWithSortKeys;
final UnfilteredRowIterator partition;

PrimaryKeyResult(PrimaryKey primaryKey, List<PrimaryKeyWithSortKey> primaryKeyWithSortKeys, UnfilteredRowIterator partition)
{
this.primaryKey = primaryKey;
this.primaryKeyWithSortKeys = primaryKeyWithSortKeys;
this.partition = partition;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems unused.

Comment on lines +185 to +187
// reorder rows in partition/clustering order
for (var triple : topK.getUnsortedShared())
addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #1488, here, I think we don't need to reorder rows for the coordinator sorting. Not even sure if we even need any of that ordering on the replicas given that we only support CL=ONE, so there is no reconciliation. But we can leave it for CNDB-12308, which will need some rebase work if it comes after this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants