-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(stream): arrangement backfill background ddl #14563
Merged
+159
−40
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
b6550f8
enable arrangement backfill
kwannoel b96cd87
remove stack trace of lines larger than 10
kwannoel b09c2c9
stack trace lines have 2 spaces in front
kwannoel a02f890
typo
kwannoel 1934813
fix
kwannoel 702b407
retry
kwannoel 134478a
patch
kwannoel 249330a
fix
kwannoel 6a9e19d
fix
kwannoel b24f4d4
fix
kwannoel e782395
retry
kwannoel bac9712
update
kwannoel 5aad942
more filtering
kwannoel 1af97ab
add more logs
kwannoel 98ce38b
only upload zipped logs
kwannoel fbb3bf2
add backfill logs
kwannoel af55a27
continue debug
kwannoel 2e49bb5
fix: if all finished, we should break out of loop
kwannoel a4968e9
dont need commit after state persisted
kwannoel 88a4bb4
add arrangement backfill background ddl recovery test
kwannoel 7712abf
fix
kwannoel ba0b1b7
fix
kwannoel 9af0d03
minor
kwannoel d628832
add some tips, reduce logs
kwannoel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
#!/usr/bin/env bash | ||
|
||
set -euo pipefail | ||
|
||
if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then | ||
mv .risingwave/log risedev-logs | ||
zip -q -r risedev-logs.zip risedev-logs/ | ||
buildkite-agent artifact upload risedev-logs.zip | ||
REGRESS_TEST_DIR="$PWD/src/tests/regress/output/results/" | ||
if [ -d "$REGRESS_TEST_DIR" ]; then | ||
mkdir regress-test-logs && cp src/tests/regress/output/results/* regress-test-logs/ | ||
zip -q -r regress-test.zip regress-test-logs/ | ||
buildkite-agent artifact upload regress-test-logs.zip | ||
fi | ||
if [ -e "$PWD/connector-node.log" ]; then | ||
buildkite-agent artifact upload "$PWD/connector-node.log" | ||
fi | ||
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,7 @@ where | |
|
||
#[try_stream(ok = Message, error = StreamExecutorError)] | ||
async fn execute_inner(mut self) { | ||
tracing::debug!("Arrangement Backfill Executor started"); | ||
// The primary key columns, in the output columns of the upstream_table scan. | ||
// Table scan scans a subset of the columns of the upstream table. | ||
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); | ||
|
@@ -450,54 +451,58 @@ where | |
"backfill_finished_wait_for_barrier" | ||
); | ||
// If not finished then we need to update state, otherwise no need. | ||
if let Message::Barrier(barrier) = &msg | ||
&& !is_completely_finished | ||
{ | ||
// If snapshot was empty, we do not need to backfill, | ||
// but we still need to persist the finished state. | ||
// We currently persist it on the second barrier here rather than first. | ||
// This is because we can't update state table in first epoch, | ||
// since it expects to have been initialized in previous epoch | ||
// (there's no epoch before the first epoch). | ||
for vnode in upstream_table.vnodes().iter_vnodes() { | ||
backfill_state.finish_progress(vnode, upstream_table.pk_indices().len()); | ||
} | ||
if let Message::Barrier(barrier) = &msg { | ||
if is_completely_finished { | ||
// If already finished, no need to persist any state. | ||
} else { | ||
// If snapshot was empty, we do not need to backfill, | ||
// but we still need to persist the finished state. | ||
// We currently persist it on the second barrier here rather than first. | ||
// This is because we can't update state table in first epoch, | ||
// since it expects to have been initialized in previous epoch | ||
// (there's no epoch before the first epoch). | ||
for vnode in upstream_table.vnodes().iter_vnodes() { | ||
backfill_state | ||
.finish_progress(vnode, upstream_table.pk_indices().len()); | ||
} | ||
|
||
persist_state_per_vnode( | ||
barrier.epoch, | ||
&mut self.state_table, | ||
&mut backfill_state, | ||
#[cfg(debug_assertions)] | ||
state_len, | ||
vnodes.iter_vnodes(), | ||
) | ||
.await?; | ||
persist_state_per_vnode( | ||
barrier.epoch, | ||
&mut self.state_table, | ||
&mut backfill_state, | ||
#[cfg(debug_assertions)] | ||
state_len, | ||
vnodes.iter_vnodes(), | ||
) | ||
.await?; | ||
} | ||
|
||
self.progress | ||
.finish(barrier.epoch.curr, total_snapshot_processed_rows); | ||
tracing::trace!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always notifies. |
||
epoch = ?barrier.epoch, | ||
"Updated CreateMaterializedTracker" | ||
); | ||
yield msg; | ||
break; | ||
} else { | ||
// Allow other messages to pass through. | ||
yield msg; | ||
} | ||
// Allow other messages to pass through. | ||
// We won't yield twice here, since if there's a barrier, | ||
// we will always break out of the loop. | ||
yield msg; | ||
} | ||
} | ||
|
||
tracing::trace!( | ||
"Arrangement Backfill has already finished and forward messages directly to the downstream" | ||
); | ||
|
||
// After progress finished + state persisted, | ||
// we can forward messages directly to the downstream, | ||
// as backfill is finished. | ||
#[for_await] | ||
for msg in upstream { | ||
if let Some(msg) = mapping_message(msg?, &self.output_indices) { | ||
tracing::trace!( | ||
actor = self.actor_id, | ||
message = ?msg, | ||
"backfill_finished_after_barrier" | ||
); | ||
if let Message::Barrier(barrier) = &msg { | ||
self.state_table.commit_no_data_expected(barrier.epoch); | ||
} | ||
yield msg; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we wait for this flag to pass. However
is_completely_finished
could be true on recovery, if backfill is completed already.In that case, we should just yield the barrier and stop waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, after recovery, the backfill state may be inconsistent with the
CreateMaterializedTracker
state, so we need to notify the progress to finish every time after recovery.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we will always notify see https://github.com/risingwavelabs/risingwave/pull/14563/files#r1464421419. It is outside of the if-else block.