Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support partial checkpoint based backfill (a.k.a snapshot backfill) #17735

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,30 @@ test_backfill_snapshot_with_wider_rows() {
kill_cluster
}

test_snapshot_backfill() {
echo "--- e2e, snapshot backfill test, $RUNTIME_CLUSTER_PROFILE"

risedev ci-start $RUNTIME_CLUSTER_PROFILE

sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/create_nexmark_table.slt'

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q3.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q7.slt' &

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_nexmark_table.slt'

kill_cluster
}

main() {
set -euo pipefail
test_snapshot_and_upstream_read
test_backfill_tombstone
test_replication_with_column_pruning
test_sink_backfill_recovery
test_snapshot_backfill

# Only if profile is "ci-release", run it.
if [[ ${profile:-} == "ci-release" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 24
timeout_in_minutes: 30
retry: *auto-retry

- label: "e2e standalone binary test"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions e2e_test/backfill/snapshot_backfill/check_data_equal.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
control substitution on

# the result from batch query and materialized view should be the same
query T
select * from ${TEST_NAME}_v
except
select * from ${TEST_NAME}_mv;
----

query T
select * from ${TEST_NAME}_mv
except
select * from ${TEST_NAME}_v;
----

query T
select * from ${TEST_NAME}_extended_mv
except
select * from ${TEST_NAME}_mv;
----

query T
select * from ${TEST_NAME}_mv
except
select * from ${TEST_NAME}_extended_mv;
----
54 changes: 54 additions & 0 deletions e2e_test/backfill/snapshot_backfill/create_nexmark_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
statement ok
CREATE table person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Person',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '50000'
);

statement ok
CREATE table auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '50000'
);

statement ok
CREATE table bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '100000'
);

# sleep for a while to let table accumulate enough data
sleep 10s
8 changes: 8 additions & 0 deletions e2e_test/backfill/snapshot_backfill/drop_nexmark_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
drop table bid;

statement ok
drop table auction;

statement ok
drop table person;
14 changes: 14 additions & 0 deletions e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q3.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
control substitution on

statement ok
create view ${TEST_NAME}_v as
SELECT P.name,
P.city,
P.state,
A.id
FROM auction AS A
INNER JOIN person AS P on A.seller = P.id
WHERE A.category = 10
and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca');

include ../run_test.slt.part
25 changes: 25 additions & 0 deletions e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q7.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
control substitution on

statement ok
create view ${TEST_NAME}_v as
SELECT
B.auction,
B.price,
B.bidder,
B.date_time
from
bid B
JOIN (
SELECT
MAX(price) AS maxprice,
window_end as date_time
FROM
TUMBLE(bid, date_time, INTERVAL '10' SECOND)
GROUP BY
window_end
) B1 ON B.price = B1.maxprice
WHERE
B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND
AND B1.date_time;

include ../run_test.slt.part
28 changes: 28 additions & 0 deletions e2e_test/backfill/snapshot_backfill/run_test.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
control substitution on

statement ok
set backfill_rate_limit = 500;

statement ok
set streaming_use_snapshot_backfill = true;

statement ok
create materialized view ${TEST_NAME}_mv as select * from ${TEST_NAME}_v;

statement ok
create materialized view ${TEST_NAME}_extended_mv as select * from ${TEST_NAME}_mv;

include ./check_data_equal.slt.part

sleep 3s

include ./check_data_equal.slt.part

statement ok
drop materialized view ${TEST_NAME}_extended_mv;

statement ok
drop materialized view ${TEST_NAME}_mv;

statement ok
drop view ${TEST_NAME}_v;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ user standard_conforming_strings
user statement_timeout
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_snapshot_backfill
user synchronize_seqscans
user timezone
user transaction_isolation
Expand Down
41 changes: 39 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ def section_streaming(outer_panels):
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
),
panels.target(
f"{metric('meta_snapshot_backfill_inflight_barrier_num')}", "snapshot_backfill_in_flight_barrier {{table_id}}"
),
],
),
panels.timeseries_latency(
Expand All @@ -745,7 +748,14 @@ def section_streaming(outer_panels):
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval]) > 0",
"barrier_latency_avg",
),
],
]
+ quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_barrier_duration_seconds_bucket')}[$__rate_interval])) by (le, table_id, barrier_type))",
f"snapshot_backfill_barrier_latency_p{legend} table_id[{{{{table_id}}}}] {{{{barrier_type}}}}",
),
[50, 90, 99, 999, "max"],
),
),
panels.timeseries(
"Barrier pending time (secs)",
Expand Down Expand Up @@ -892,6 +902,11 @@ def section_streaming(outer_panels):
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
panels.target(
f"rate({table_metric('stream_snapshot_backfill_consume_snapshot_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} {{stage}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
Expand Down Expand Up @@ -974,7 +989,19 @@ def section_streaming(outer_panels):
f"rate({metric('meta_barrier_wait_commit_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_wait_commit_duration_seconds_count')}[$__rate_interval]) > 0",
"barrier_wait_commit_avg",
),
],
] + quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_barrier_wait_commit_duration_seconds_bucket')}[$__rate_interval])) by (le, table_id))",
f"snapshot_backfill_barrier_wait_commit_latency_p{legend} table_id[{{{{table_id}}}}]",
),
[50, 90, 99, 999, "max"],
) + quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_upstream_wait_progress_latency_bucket')}[$__rate_interval])) by (le, table_id))",
f"snapshot_backfill_upstream_wait_progress_latency_p{legend} table_id[{{{{table_id}}}}]",
),
[50, 90, 99, 999, "max"],
)
),
panels.timeseries_ops(
"Earliest In-Flight Barrier Progress",
Expand All @@ -987,6 +1014,16 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_latency(
"Snapshot Backfill Lag",
"",
[
panels.target(
f"{metric('meta_snapshot_backfill_upstream_lag')} / (2^16) / 1000",
"lag @ {{table_id}}",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ message CombinedMutation {
}

message SubscriptionUpstreamInfo {
// can either be subscription_id or table_id of creating TableFragments
uint32 subscriber_id = 1;
uint32 upstream_mv_table_id = 2;
}
Expand Down Expand Up @@ -543,6 +544,9 @@ enum StreamScanType {

// ArrangementBackfillExecutor
STREAM_SCAN_TYPE_ARRANGEMENT_BACKFILL = 5;

// SnapshotBackfillExecutor
STREAM_SCAN_TYPE_SNAPSHOT_BACKFILL = 6;
}

// StreamScanNode reads data from upstream table first, and then pass all events to downstream.
Expand Down Expand Up @@ -949,6 +953,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048;
}

// The streaming context associated with a stream plan
Expand Down
10 changes: 10 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
// Actors in the partial graphs of the creating jobs that need to be pre-synced the barrier mutation to.
//
// This is required because in snapshot backfill, the snapshot backfill executor receive barriers from
// both local barrier manager and upstream. If we don't pre-sync the barrier mutations, when an input executor
// of an snapshot backfill actor receive a barrier, it will be blocked when trying the fetch the mutation
// of this upstream barrier. The reason for blocking is that, the snapshot backfill have slower progress,
// and therefore won't be synced with the mutation of barrier in upstream. To solve this issue of blocking,
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
}

message BarrierCompleteResponse {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub struct SessionConfig {
#[parameter(default = true)]
streaming_use_arrangement_backfill: bool,

#[parameter(default = false)]
streaming_use_snapshot_backfill: bool,

/// Allow `jsonb` in stream key
#[parameter(default = false, rename = "rw_streaming_allow_jsonb_in_stream_key")]
streaming_allow_jsonb_in_stream_key: bool,
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/util/chunk_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ impl DataChunkBuilder {
}
}

pub fn batch_size(&self) -> usize {
self.batch_size
}

/// Lazily create the array builders if absent
fn ensure_builders(&mut self) {
if self.array_builders.len() != self.data_types.len() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,27 @@ async fn read_hummock_snapshot_groups(
})
.collect())
}

#[derive(Fields)]
struct RwHummockTableChangeLog {
#[primary_key]
table_id: i32,
change_log: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_hummock_table_change_log")]
async fn read_hummock_table_change_log(
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
reader: &SysCatalogReaderImpl,
) -> Result<
Vec<crate::catalog::system_catalog::rw_catalog::rw_hummock_version::RwHummockTableChangeLog>,
> {
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.table_change_log
.iter()
.map(|(table_id, change_log)| RwHummockTableChangeLog {
table_id: table_id.table_id as i32,
change_log: json!(change_log.to_protobuf()).into(),
})
.collect())
}
Loading
Loading