-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CNDB-12304: Parallelize ORDER BY row materialization/validation #1489
base: main
Are you sure you want to change the base?
Conversation
Quality Gate passedIssues Measures |
❌ Build ds-cassandra-pr-gate/PR-1489 rejected by Butler1 new test failure(s) in 2 builds Found 1 new test failures
Found 5834 known test failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.cassandra.index.sai.utils.PrimaryKey; | ||
import org.apache.cassandra.index.sai.utils.RangeUtil; | ||
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; | ||
import org.apache.cassandra.io.util.FileUtils; | ||
import org.apache.cassandra.schema.TableMetadata; | ||
import org.apache.cassandra.tracing.Tracing; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unneeded import
|
||
/** | ||
* Executor to use for parallel index reads. | ||
* Defined by -Dcassandra.index_read.parallele=true/false, true by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Defined by -Dcassandra.index_read.parallele=true/false, true by default. | |
* Defined by -Dcassandra.index_read.parallel=true/false, true by default. |
return SharedExecutorPool.SHARED.newExecutor(numThreads, maximumPoolSize -> {}, "request", "IndexParallelRead"); | ||
} | ||
else | ||
return ImmediateExecutor.INSTANCE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: if we are to follow ASF style rules (we might not), use brackets if the other branch has brackets:
return ImmediateExecutor.INSTANCE; | |
{ | |
return ImmediateExecutor.INSTANCE; | |
} |
* {@link #nextSelectedKeyInRange()}. We map PrimaryKey to List<PrimaryKeyWithSortKey> because the same | ||
* primary key can be in the result set multiple times, but with different source tables. | ||
* @param keys the map to fill | ||
* Consumes the next `count` unique primary keys produced {@link #nextSelectedKeyInRange()}, then materializes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Consumes the next `count` unique primary keys produced {@link #nextSelectedKeyInRange()}, then materializes | |
* Consumes the next {@code count} unique primary keys produced by {@link #nextSelectedKeyInRange()}, then materializes |
return; | ||
try (var partition = controller.getPartition(primaryKey, view, executionController)) | ||
{ | ||
// Validates that the parition's row is valid for the query predicates. It returns null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Validates that the parition's row is valid for the query predicates. It returns null | |
// Validates that the partition's row is valid for the query predicates. It returns null |
// if the row is not valid. Reasons for invalidity include: | ||
// 1. The row is a range or row tombstone or is expired. | ||
// 2. The row does not satisfy the query predicates | ||
// 3. The row does not satisfy the ORDER BY clause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can a row not satisfy the ORDER BY
clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clarify it here and in the comment. Essentially, because of updates, we have to check that the source sstable for the materialized cell is the same sstable as the source SAI index's sstable. Otherwise, we can have out of order rows due to updates to a worse position in the iterator of ordered rows.
var now = FBUtilities.nowInSeconds(); | ||
boolean isRowValid = false; | ||
var row = clusters.next(); | ||
assert !clusters.hasNext() : "Expected only one row per partition"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this method could be named validateSingleRowPartition
for more clarity?
private static class PrimaryKeyResult | ||
{ | ||
final PrimaryKey primaryKey; | ||
final List<PrimaryKeyWithSortKey> primaryKeyWithSortKeys; | ||
final UnfilteredRowIterator partition; | ||
|
||
PrimaryKeyResult(PrimaryKey primaryKey, List<PrimaryKeyWithSortKey> primaryKeyWithSortKeys, UnfilteredRowIterator partition) | ||
{ | ||
this.primaryKey = primaryKey; | ||
this.primaryKeyWithSortKeys = primaryKeyWithSortKeys; | ||
this.partition = partition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class seems unused.
// reorder rows in partition/clustering order | ||
for (var triple : topK.getUnsortedShared()) | ||
addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in #1488, here, I think we don't need to reorder rows for the coordinator sorting. Not even sure if we even need any of that ordering on the replicas given that we only support CL=ONE, so there is no reconciliation. But we can leave it for CNDB-12308, which will need some rebase work if it comes after this.
What is the issue
Fixes https://github.com/riptano/cndb/issues/12304
What does this PR fix and why was it fixed
See https://github.com/riptano/cndb/issues/12304 for details. The summary is that this parallelizes row materialization to ensure that we utilize all as much resources as possible when materializing large LIMIT requests.
In order to make this work, I needed to make a version of
RefViewFragment
that is pre-sorted so that it can be shared by multiple threads at the same time.There are several ways we could parallelize row materialization. This PR iterates over the following steps until the LIMIT is reached or there are no more rows satisfying the query:
LIMIT - returnedRowCount
or 1PrimaryKeys
in from the SAI source iterator.PARALLEL_EXECUTOR
thread to simplify the logic associated with ensuring we close all partitions.Optimizations considered not implemented:
Checklist before you submit for review
NoSpamLogger
for log lines that may appear frequently in the logs