-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: use high watermark to finish backfill faster #18342
Merged
xxchan
merged 4 commits into
main
from
08-24-refactor_use_high_watermark_to_finish_backfill_faster
Sep 4, 2024
+233
−51
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,29 @@ SET rw_enable_shared_source TO true; | |
system ok | ||
rpk topic create shared_source -p 4 | ||
|
||
# Test create source before produing data. | ||
statement ok | ||
create source s_before_produce (v1 int, v2 varchar) with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'shared_source', | ||
scan.startup.mode = 'earliest' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
create materialized view mv_before_produce as select * from s_before_produce; | ||
|
||
sleep 2s | ||
|
||
# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately. | ||
system ok | ||
internal_table.mjs --name mv_before_produce --type sourcebackfill | ||
---- | ||
0,"""Finished""" | ||
1,"""Finished""" | ||
2,"""Finished""" | ||
3,"""Finished""" | ||
|
||
|
||
system ok | ||
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0 | ||
0 {"v1": 1, "v2": "a"} | ||
|
@@ -21,7 +44,7 @@ create source s0 (v1 int, v2 varchar) with ( | |
scan.startup.mode = 'earliest' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
query I | ||
query ? | ||
select count(*) from rw_internal_tables where name like '%s0%'; | ||
---- | ||
1 | ||
|
@@ -41,21 +64,24 @@ create materialized view mv_1 as select * from s0; | |
# Wait enough time to ensure SourceExecutor consumes all Kafka data. | ||
sleep 2s | ||
|
||
# SourceExecutor's ingestion started, but it only starts from latest. | ||
# SourceExecutor's ingestion started, but it only starts from latest (offset 1). | ||
system ok | ||
internal_table.mjs --name s0 --type source | ||
---- | ||
(empty) | ||
|
||
|
||
# offset 0 must be backfilled, not from upstream. | ||
# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, what will the high watermark value be? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 |
||
# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0) | ||
# After backfilling offset 0, it enters SourceCachingUp state. Now the backfill is finished. | ||
# We wait for SourceExecutor to produce offset > 0. | ||
system ok | ||
internal_table.mjs --name mv_1 --type sourcebackfill | ||
---- | ||
0,"{""Backfilling"": ""0""}" | ||
1,"{""Backfilling"": ""0""}" | ||
2,"{""Backfilling"": ""0""}" | ||
3,"{""Backfilling"": ""0""}" | ||
0,"{""SourceCachingUp"": ""0""}" | ||
1,"{""SourceCachingUp"": ""0""}" | ||
2,"{""SourceCachingUp"": ""0""}" | ||
3,"{""SourceCachingUp"": ""0""}" | ||
|
||
|
||
# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. | ||
|
@@ -67,23 +93,23 @@ create materialized view mv_2 as select * from s0; | |
|
||
sleep 2s | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, v2 from s0; | ||
---- | ||
1 a | ||
2 b | ||
3 c | ||
4 d | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, v2 from mv_1; | ||
---- | ||
1 a | ||
2 b | ||
3 c | ||
4 d | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, v2 from mv_2; | ||
---- | ||
1 a | ||
|
@@ -111,7 +137,7 @@ internal_table.mjs --name s0 --type source | |
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" | ||
|
||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, v2 from s0; | ||
---- | ||
1 a | ||
|
@@ -123,7 +149,7 @@ select v1, v2 from s0; | |
4 d | ||
4 dd | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, v2 from mv_1; | ||
---- | ||
1 a | ||
|
@@ -146,18 +172,14 @@ internal_table.mjs --name s0 --type source | |
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" | ||
|
||
|
||
# The result is non-deterministic: | ||
# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}". | ||
# If the upstream row comes after the backfill row, the result state is Finished. | ||
# Uncomment below and run manually to see the result. | ||
|
||
# system ok | ||
# internal_table.mjs --name mv_1 --type sourcebackfill | ||
# ---- | ||
# 0,"{""Finished""}" | ||
# 1,"{""Finished""}" | ||
# 2,"{""Finished""}" | ||
# 3,"{""Finished""}" | ||
# Transition from SourceCachingUp to Finished after consuming one upstream message. | ||
system ok | ||
internal_table.mjs --name mv_1 --type sourcebackfill | ||
---- | ||
0,"""Finished""" | ||
1,"""Finished""" | ||
2,"""Finished""" | ||
3,"""Finished""" | ||
|
||
|
||
system ok | ||
|
@@ -173,22 +195,30 @@ done | |
|
||
sleep 3s | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, count(*) from s0 group by v1; | ||
---- | ||
1 12 | ||
2 12 | ||
3 12 | ||
4 12 | ||
|
||
query IT rowsort | ||
query ?? rowsort | ||
select v1, count(*) from mv_1 group by v1; | ||
---- | ||
1 12 | ||
2 12 | ||
3 12 | ||
4 12 | ||
|
||
query ?? rowsort | ||
select v1, count(*) from mv_before_produce group by v1; | ||
---- | ||
1 12 | ||
2 12 | ||
3 12 | ||
4 12 | ||
|
||
|
||
# start_offset changed to 11 | ||
system ok | ||
|
@@ -200,15 +230,8 @@ internal_table.mjs --name s0 --type source | |
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" | ||
|
||
|
||
# Now it is highly probable that all partitions have finished. | ||
system ok | ||
internal_table.mjs --name mv_1 --type sourcebackfill | ||
---- | ||
0,"""Finished""" | ||
1,"""Finished""" | ||
2,"""Finished""" | ||
3,"""Finished""" | ||
|
||
|
||
statement ok | ||
drop source s0 cascade; | ||
|
||
statement ok | ||
drop source s_before_produce cascade; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we need to use
?
instead ofI
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently
sqllogictest --override
will produce?
. Actually the character doesn't have any meaning now, any character will pass test..