-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(stream): optimize arrangement backfill snapshot read #14617
Changes from all commits
c94d984
b1793c4
93f97a1
8d8eeaf
0f6da74
b3c1f8a
8e99e87
cf9d839
d87cfe5
b9db7b5
1ef2446
8f14f76
24dbd10
57067ff
9c016f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
statement ok | ||
CREATE TABLE t (v1 int, v2 varchar, v3 bigint); | ||
CREATE TABLE t (v1 int primary key, v2 varchar, v3 bigint); |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
statement ok | ||
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(4000001, 8000000); | ||
|
||
statement ok | ||
flush; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
statement ok | ||
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(1, 4000000); | ||
|
||
statement ok | ||
flush; |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
query I | ||
select (select count(*) from arrangement_backfill) = (select count(*) from t); | ||
---- | ||
t |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
query I | ||
select (select count(*) from no_shuffle_backfill) = (select count(*) from t); | ||
---- | ||
t |
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ use std::pin::pin; | |
use std::sync::Arc; | ||
|
||
use either::Either; | ||
use futures::stream::select_with_strategy; | ||
use futures::stream::{select_all, select_with_strategy}; | ||
use futures::{stream, StreamExt, TryStreamExt}; | ||
use futures_async_stream::try_stream; | ||
use itertools::Itertools; | ||
|
@@ -27,6 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; | |
use risingwave_common::util::chunk_coalesce::DataChunkBuilder; | ||
use risingwave_common::util::iter_util::ZipEqDebug; | ||
use risingwave_storage::row_serde::value_serde::ValueRowSerde; | ||
use risingwave_storage::store::PrefetchOptions; | ||
use risingwave_storage::StateStore; | ||
|
||
use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; | ||
|
@@ -533,6 +534,7 @@ where | |
backfill_state: BackfillState, | ||
builders: &'a mut [DataChunkBuilder], | ||
) { | ||
let mut iterators = vec![]; | ||
for (vnode, builder) in upstream_table | ||
.vnodes() | ||
.iter_vnodes() | ||
|
@@ -558,19 +560,29 @@ where | |
"iter_with_vnode_and_output_indices" | ||
); | ||
let vnode_row_iter = upstream_table | ||
.iter_with_vnode_and_output_indices(vnode, &range_bounds, Default::default()) | ||
.iter_with_vnode_and_output_indices( | ||
vnode, | ||
&range_bounds, | ||
PrefetchOptions::prefetch_for_small_range_scan(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimization 1: Use same prefetch config as |
||
) | ||
.await?; | ||
|
||
let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); | ||
|
||
let vnode_chunk_iter = | ||
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk)); | ||
|
||
// This means we iterate serially rather than in parallel across vnodes. | ||
#[for_await] | ||
for chunk in vnode_chunk_iter { | ||
yield Some(chunk?); | ||
} | ||
let vnode_chunk_iter = Box::pin(vnode_chunk_iter); | ||
|
||
iterators.push(vnode_chunk_iter); | ||
} | ||
|
||
let vnode_chunk_iter = select_all(iterators); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimization 2: Make sure iter is done in parallel. |
||
|
||
// This means we iterate serially rather than in parallel across vnodes. | ||
#[for_await] | ||
for chunk in vnode_chunk_iter { | ||
yield Some(chunk?); | ||
} | ||
yield None; | ||
return Ok(()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only run if release mode.