Skip to content

Commit

Permalink
feat: persist processed rows for SourceBackfill for SHOW JOBS (#18983)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Oct 21, 2024
1 parent d17ce39 commit 3e7fffd
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ e2e_test/iceberg/metastore_db
**/*.sqlite
**/*.sqlite-journal

*.slt.temp
*.slt*.temp

.direnv/

Expand Down
134 changes: 76 additions & 58 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ sleep 2s
system ok
internal_table.mjs --name mv_before_produce --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""
0,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}"
1,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}"
2,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}"
3,"{""num_consumed_rows"": 0, ""state"": ""Finished"", ""target_offset"": null}"


system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
3 {"v1": 4, "v2": "d"}
3 {"v1": 4, "v2": "d"}
EOF

Expand Down Expand Up @@ -83,10 +86,10 @@ internal_table.mjs --name s0 --type source
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""SourceCachingUp"": ""0""}"
1,"{""SourceCachingUp"": ""0""}"
2,"{""SourceCachingUp"": ""0""}"
3,"{""SourceCachingUp"": ""0""}"
0,"{""num_consumed_rows"": 1, ""state"": {""SourceCachingUp"": ""0""}, ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 1, ""state"": {""SourceCachingUp"": ""0""}, ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 2, ""state"": {""SourceCachingUp"": ""1""}, ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 3, ""state"": {""SourceCachingUp"": ""2""}, ""target_offset"": ""2""}"


# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
Expand All @@ -101,26 +104,35 @@ sleep 2s
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d
1 a
2 b
3 c
3 c
4 d
4 d
4 d

query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d
1 a
2 b
3 c
3 c
4 d
4 d
4 d

query ?? rowsort
select v1, v2 from mv_2;
----
1 a
2 b
3 c
4 d
1 a
2 b
3 c
3 c
4 d
4 d
4 d

system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
Expand All @@ -138,33 +150,39 @@ internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query ?? rowsort
select v1, v2 from s0;
----
1 a
1 aa
2 b
2 bb
3 c
3 cc
4 d
4 dd
1 a
1 aa
2 b
2 bb
3 c
3 c
3 cc
4 d
4 d
4 d
4 dd

query ?? rowsort
select v1, v2 from mv_1;
----
1 a
1 aa
2 b
2 bb
3 c
3 cc
4 d
4 dd
1 a
1 aa
2 b
2 bb
3 c
3 c
3 cc
4 d
4 d
4 d
4 dd


# start_offset changed to 1
Expand All @@ -173,18 +191,18 @@ internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""
0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}"


system ok
Expand All @@ -203,26 +221,26 @@ sleep 3s
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12
1 12
2 12
3 13
4 14

query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 12
4 12
1 12
2 12
3 13
4 14

query ?? rowsort
select v1, count(*) from mv_before_produce group by v1;
----
1 12
2 12
3 12
4 12
1 12
2 12
3 13
4 14


# start_offset changed to 11
Expand All @@ -231,8 +249,8 @@ internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 12, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 13, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# # Note: the parallelism depends on the risedev profile.
Expand Down
Loading

0 comments on commit 3e7fffd

Please sign in to comment.