-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(stream): optimize arrangement backfill snapshot read #14617
Merged
Merged
Changes from 7 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
c94d984
update backfill tests to include monitoring, fix upstream updates to …
kwannoel b1793c4
fix
kwannoel 93f97a1
parameterize on buildkite env var
kwannoel 8d8eeaf
fix dashboard
kwannoel 0f6da74
do snapshot read in parallel
kwannoel b3c1f8a
add prefetch options
kwannoel 8e99e87
fmt
kwannoel cf9d839
refactor tests
kwannoel d87cfe5
fix
kwannoel b9db7b5
fix
kwannoel 1ef2446
increase workload -> 4M records
kwannoel 8f14f76
increase workload -> 8M
kwannoel 24dbd10
change check
kwannoel 57067ff
fix
kwannoel 9c016f2
update dashboard artifacts
kwannoel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
statement ok | ||
CREATE TABLE t (v1 int, v2 varchar, v3 bigint); | ||
CREATE TABLE t (v1 int primary key, v2 varchar, v3 bigint); |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
statement ok | ||
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(500001, 2000000); | ||
|
||
statement ok | ||
flush; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
statement ok | ||
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(1, 500000); | ||
|
||
statement ok | ||
flush; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ use std::pin::pin; | |
use std::sync::Arc; | ||
|
||
use either::Either; | ||
use futures::stream::select_with_strategy; | ||
use futures::stream::{select_all, select_with_strategy}; | ||
use futures::{stream, StreamExt, TryStreamExt}; | ||
use futures_async_stream::try_stream; | ||
use itertools::Itertools; | ||
|
@@ -27,6 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; | |
use risingwave_common::util::chunk_coalesce::DataChunkBuilder; | ||
use risingwave_common::util::iter_util::ZipEqDebug; | ||
use risingwave_storage::row_serde::value_serde::ValueRowSerde; | ||
use risingwave_storage::store::PrefetchOptions; | ||
use risingwave_storage::StateStore; | ||
|
||
use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; | ||
|
@@ -533,6 +534,7 @@ where | |
backfill_state: BackfillState, | ||
builders: &'a mut [DataChunkBuilder], | ||
) { | ||
let mut iterators = vec![]; | ||
for (vnode, builder) in upstream_table | ||
.vnodes() | ||
.iter_vnodes() | ||
|
@@ -558,19 +560,29 @@ where | |
"iter_with_vnode_and_output_indices" | ||
); | ||
let vnode_row_iter = upstream_table | ||
.iter_with_vnode_and_output_indices(vnode, &range_bounds, Default::default()) | ||
.iter_with_vnode_and_output_indices( | ||
vnode, | ||
&range_bounds, | ||
PrefetchOptions::prefetch_for_small_range_scan(), | ||
) | ||
.await?; | ||
|
||
let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); | ||
|
||
let vnode_chunk_iter = | ||
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk)); | ||
|
||
// This means we iterate serially rather than in parallel across vnodes. | ||
#[for_await] | ||
for chunk in vnode_chunk_iter { | ||
yield Some(chunk?); | ||
} | ||
let vnode_chunk_iter = Box::pin(vnode_chunk_iter); | ||
|
||
iterators.push(vnode_chunk_iter); | ||
} | ||
|
||
let vnode_chunk_iter = select_all(iterators); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimization 2: Make sure iter is done in parallel. |
||
|
||
// This means we iterate serially rather than in parallel across vnodes. | ||
#[for_await] | ||
for chunk in vnode_chunk_iter { | ||
yield Some(chunk?); | ||
} | ||
yield None; | ||
return Ok(()); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization 1: Use same prefetch config as
no_shuffle_backfill
.