Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(stream): concurrently fetch row from storage and refill cache #19629

Merged
merged 45 commits into from
Jan 7, 2025

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Dec 1, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently we read all rows matching a key, encode them, and store them in an in-memory data structure. We add this to the cache. Then we iterate each row from the in-memory data structure, decode them. This means we have to incur IO latency of reading all rows first. We also have to incur the memory cost of buffering all matching records.

Instead of doing this, we can iterate the rows, concurrently refill cache for them. Then we don't have to wait for IO to finish for ALL rows. We also avoid OOM, since while refilling cache, if we notice the number of rows exceeds some threshold, we can stop refilling.

Further, if we tolerate inconsistency, we will revert to the old strategy of reading all records into cache first, so we will preserve that logic.

This is done for all join types in the hash join executor.

Handling degree table matches and updates

Before this PR we:

  1. Read from the match side degree table when doing cache refill (hold an immutable ref)
  2. Update the match side degree table when doing cache refill.

However, in this PR we concurrently refill cache and handle matches. This means that 1&2 happen concurrently, which means we hold an immutable and mutable reference to the underlying match side degree table. This will be rejected by the borrow checker.

To solve this, we read from the match side degree table, keeping all the degrees in-memory. Only after that is complete, we start concurrently doing cache refill of the match side, and also updating degrees of match side. In this way the lifetimes of the mutable and immutable reference to match side degree table won't overlap.

In the future we can optimize it further, holding a separate cache for degrees, from their join state, since the degree state is much smaller.

Nexmark Benchmark results

Build: https://buildkite.com/risingwave-test/nexmark-benchmark/builds/4994
Dashboards:

unoptimized optimized remarks
q3 7.3k 7.2k
q5 45.7k 222.8k
q7 915 3.7k
q8 7.7K 6.5k
q20 51.3K 50.9K
q101 250.7K 283.3K
q102 21.8K 42.6K
q105 7.34 4.62 Tps is naturally low, fluctuating between 4-11

Micro Benchmark results

Cache Miss

With a 30,000 record limit for each key we measure the performance + memory consumption when a single record is matched against N records which are not in-cache. Here's the comparison (using in-memory statestore, inner join, sample size = 100):

Amp Total Bytes Total Bytes (opt) Diff Runtime (ns) Runtime (ns) (opt) Diff
20,000.00 3279096 3260364 -0.57 17073601.28 16143637.7 -5.45
40,000.00 6444552 4863221 -24.54 35936541.66 29335407.29 -18.37
200,000.00 31769736 4863221 -84.69 191953280.4 100895451.7 -47.44
400,000.00 63427368 4863221 -92.33 387335817.9 185162442.9 -52.20

You can observe how after 30,000, we don't do cache refill anymore, and so the memory usage becomes capped, and we can avoid OOM.

Do also note that because this is an in-memory statestore, we can see significant runtime improvements. I expect that with block cache enabled, we can also see similar improvements, although may not be as high.

Cache Hit

We join ingest 64 records on the update (probe) side, to match all amplified records on the build side. Sample size is N, using in-memory state store. For memory utilization, we measure mem table usage, it's unavoidable because we have to use a barrier to synchronize the sequence of ingestion, so we can make the sequence of join deterministic, rather than interleaved from lhs and rhs of join. But it's fine because we don't expect better memory utilization, since everything is in cache.

Cache hit results show some regression for runtime (~5-6%):

Amp Total Bytes Total Bytes (opt) Diff Runtime (ns) Runtime (ns) (opt) Diff
20,000.00 8586702 8586702 0.00 225010070 239153682.1 6.29
40,000.00 17024222 17024222 0.00 457963188.8 482480081.3 5.35
200,000.00 84527806 84527806 0.00 2318848400 2468701041 6.46
400,000.00 168909854 168909854 0.00 4621595553 4900965761 6.04

However:

  1. nexmark e2e performance tests show that the performance is good in most cases.
  2. The microbenchmark is a highly specific case. It's an inner join with records in-cache, under high join amplification. This is not exactly the common case. Hence I think it's acceptable to further optimize later where needed.
  3. The regression might be due to abstracting various functionality into standalone functions. Some of these are functions that return generators (streams), and are difficult for the compiler to inline. We can inline them by hand to improve performance. But I think we should optimize in other areas first (e.g. append_n (~50%), decode (15%)), which are major cost centers in the flamegraph, rather than optimize areas which lead to poorer readability and maintainability of the code.
  4. There are also other more "obvious" optimizations, such as inlining cond expr evaluation if there's no cond expr, eliminating the is_append_only code path if join is non-append-only.

Further improvements

We can also check the cache size when inserting the update records into cache. Currently we only do so for the match side, since that has N matches for a single update record. We differ this optimization to later to avoid further complicating this PR.

We may also need to call try_flush more frequently. To ensure records in join state get flushed.

We also need to remove merge chunks triggered OOM.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Contributor Author

kwannoel commented Dec 1, 2024

@kwannoel kwannoel changed the title refactor perf(stream): concurrently fetch row from storage and refill cache Dec 1, 2024
@kwannoel kwannoel marked this pull request as ready for review December 1, 2024 07:58
@kwannoel kwannoel marked this pull request as draft December 1, 2024 07:59
@kwannoel kwannoel marked this pull request as ready for review December 2, 2024 01:29
@kwannoel kwannoel marked this pull request as draft December 2, 2024 01:30
@fuyufjh
Copy link
Member

fuyufjh commented Dec 4, 2024

The motivation is similar to #10979 / #12615, but this is much better. Love it!

IIRC, Nexmark has some queries with large join state. May use it as benchmark.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2024

Benchmarking this in RWC does not show decrease in mem utilization. There's probably something wrong with the implementation. Writing a memory benchmark first.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 9, 2024

Added benchmark: #19712. Continuing investigation.

@kwannoel kwannoel changed the base branch from main to kwannoel/join-bench December 9, 2024 05:54
@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 9, 2024

Works well. See PR description. Memory utilization plateaus after the threshold set.

@kwannoel kwannoel force-pushed the kwannoel/join-bench branch from efda035 to 5d72e75 Compare December 9, 2024 07:22
@kwannoel kwannoel force-pushed the 11-29-refactor branch 2 times, most recently from 8766e99 to 3f8c1a6 Compare December 9, 2024 11:24
@kwannoel kwannoel changed the base branch from kwannoel/join-bench to graphite-base/19629 December 10, 2024 01:25
@kwannoel kwannoel force-pushed the graphite-base/19629 branch from f5188c8 to bd82fe3 Compare December 10, 2024 06:42
@kwannoel kwannoel changed the base branch from graphite-base/19629 to main December 10, 2024 06:42
@kwannoel
Copy link
Contributor Author

We don't have any issues supporting this for INNER JOIN, however for joins which require a degree table, more consideration is needed. Carving out this optimization just for INNER JOIN is possible, but it will add a new code branch and more complexity.


Currently we:

  1. Read all the degrees and matched rows into memory.
  2. Iterate through them, updating the degree table for the build side.

So the upfront IO cost is: max(read_degree_latency, read_matched_rows_latency). Typically read_matched_rows_latency should be higher, since degree will have less data, only key + degree.

Then the total latency for cache missed will be max(read_degree_latency, read_matched_rows_latency) + cpu latency of processing matched rows.

After this PR:
We concurrently read matched rows and handle them. However, for degrees we can't do so because
we need to iterate over the degrees, and update them at the same time. This means holding an immutable and mutable reference to StateTable at the same time.

Currently I workaround this by reading all the degrees into memory first. The size of the degrees shouldn't be too bad, since we just need to store the degrees.
This means the upfront IO cost will be read_degree_latency. Then we will need to pay IO costs of read_matched_rows_latency in an amortized way, since we concurrently stream and handle each matched row.

Then the total latency for cache missed will be read_degree_latency + cpu AND io latency of processing matched rows.
So it might take longer to do cache refill.

However, given that cache miss is expected to seldom occur, I think this is a reasonable trade-off to make.

Will benchmark this approach against a nexmark query with LEFT OUTER JOIN.

Another approach is to concurrently do point get, and write to the degree state table, since the point get reference to the degree table is short-lived. However, point get does not have prefetch interface yet.


The second issue is tolerating inconsistency. Previously we buffered all matched rows into memory. If there's a mismatch in the number of rows in the degree table rows and the matched table rows, we will compare their pk.

I think it's possible to support this with the point get approach. Alternatively, if tolerate inconsistency is set to true, we can always follow the old approach of refilling cache first.

@kwannoel kwannoel requested review from yuhao-su and st1page December 11, 2024 04:28
@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 11, 2024

Will add a micro-bench for left join, cache hit before proceeding.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the Cargo.lock changes.

@kwannoel kwannoel added this pull request to the merge queue Jan 7, 2025
Merged via the queue into main with commit 2497760 Jan 7, 2025
33 of 34 checks passed
@kwannoel kwannoel deleted the 11-29-refactor branch January 7, 2025 09:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants