Skip to content

Commit

Permalink
feat: support partial checkpoint based backfill (a.k.a snapshot backf…
Browse files Browse the repository at this point in the history
…ill) (#17735)
  • Loading branch information
wenym1 authored Aug 15, 2024
1 parent cbeda4d commit cfea9f3
Show file tree
Hide file tree
Showing 47 changed files with 2,890 additions and 242 deletions.
18 changes: 18 additions & 0 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,30 @@ test_backfill_snapshot_with_wider_rows() {
kill_cluster
}

test_snapshot_backfill() {
echo "--- e2e, snapshot backfill test, $RUNTIME_CLUSTER_PROFILE"

risedev ci-start $RUNTIME_CLUSTER_PROFILE

sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/create_nexmark_table.slt'

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q3.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q7.slt' &

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_nexmark_table.slt'

kill_cluster
}

main() {
set -euo pipefail
test_snapshot_and_upstream_read
test_backfill_tombstone
test_replication_with_column_pruning
test_sink_backfill_recovery
test_snapshot_backfill

# Only if profile is "ci-release", run it.
if [[ ${profile:-} == "ci-release" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 24
timeout_in_minutes: 30
retry: *auto-retry

- label: "e2e standalone binary test"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions e2e_test/backfill/snapshot_backfill/check_data_equal.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
control substitution on

# the result from batch query and materialized view should be the same
query T
select * from ${TEST_NAME}_v
except
select * from ${TEST_NAME}_mv;
----

query T
select * from ${TEST_NAME}_mv
except
select * from ${TEST_NAME}_v;
----

query T
select * from ${TEST_NAME}_extended_mv
except
select * from ${TEST_NAME}_mv;
----

query T
select * from ${TEST_NAME}_mv
except
select * from ${TEST_NAME}_extended_mv;
----
54 changes: 54 additions & 0 deletions e2e_test/backfill/snapshot_backfill/create_nexmark_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
statement ok
CREATE table person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Person',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '50000'
);

statement ok
CREATE table auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '50000'
);

statement ok
CREATE table bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '100000'
);

# sleep for a while to let table accumulate enough data
sleep 10s
8 changes: 8 additions & 0 deletions e2e_test/backfill/snapshot_backfill/drop_nexmark_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
drop table bid;

statement ok
drop table auction;

statement ok
drop table person;
14 changes: 14 additions & 0 deletions e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q3.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
control substitution on

statement ok
create view ${TEST_NAME}_v as
SELECT P.name,
P.city,
P.state,
A.id
FROM auction AS A
INNER JOIN person AS P on A.seller = P.id
WHERE A.category = 10
and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca');

include ../run_test.slt.part
25 changes: 25 additions & 0 deletions e2e_test/backfill/snapshot_backfill/nexmark/nexmark_q7.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
control substitution on

statement ok
create view ${TEST_NAME}_v as
SELECT
B.auction,
B.price,
B.bidder,
B.date_time
from
bid B
JOIN (
SELECT
MAX(price) AS maxprice,
window_end as date_time
FROM
TUMBLE(bid, date_time, INTERVAL '10' SECOND)
GROUP BY
window_end
) B1 ON B.price = B1.maxprice
WHERE
B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND
AND B1.date_time;

include ../run_test.slt.part
28 changes: 28 additions & 0 deletions e2e_test/backfill/snapshot_backfill/run_test.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
control substitution on

statement ok
set backfill_rate_limit = 500;

statement ok
set streaming_use_snapshot_backfill = true;

statement ok
create materialized view ${TEST_NAME}_mv as select * from ${TEST_NAME}_v;

statement ok
create materialized view ${TEST_NAME}_extended_mv as select * from ${TEST_NAME}_mv;

include ./check_data_equal.slt.part

sleep 3s

include ./check_data_equal.slt.part

statement ok
drop materialized view ${TEST_NAME}_extended_mv;

statement ok
drop materialized view ${TEST_NAME}_mv;

statement ok
drop view ${TEST_NAME}_v;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ user standard_conforming_strings
user statement_timeout
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_snapshot_backfill
user synchronize_seqscans
user timezone
user transaction_isolation
Expand Down
41 changes: 39 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ def section_streaming(outer_panels):
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
),
panels.target(
f"{metric('meta_snapshot_backfill_inflight_barrier_num')}", "snapshot_backfill_in_flight_barrier {{table_id}}"
),
],
),
panels.timeseries_latency(
Expand All @@ -745,7 +748,14 @@ def section_streaming(outer_panels):
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval]) > 0",
"barrier_latency_avg",
),
],
]
+ quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_barrier_duration_seconds_bucket')}[$__rate_interval])) by (le, table_id, barrier_type))",
f"snapshot_backfill_barrier_latency_p{legend} table_id[{{{{table_id}}}}] {{{{barrier_type}}}}",
),
[50, 90, 99, 999, "max"],
),
),
panels.timeseries(
"Barrier pending time (secs)",
Expand Down Expand Up @@ -892,6 +902,11 @@ def section_streaming(outer_panels):
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
panels.target(
f"rate({table_metric('stream_snapshot_backfill_consume_snapshot_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} {{stage}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
Expand Down Expand Up @@ -974,7 +989,19 @@ def section_streaming(outer_panels):
f"rate({metric('meta_barrier_wait_commit_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_wait_commit_duration_seconds_count')}[$__rate_interval]) > 0",
"barrier_wait_commit_avg",
),
],
] + quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_barrier_wait_commit_duration_seconds_bucket')}[$__rate_interval])) by (le, table_id))",
f"snapshot_backfill_barrier_wait_commit_latency_p{legend} table_id[{{{{table_id}}}}]",
),
[50, 90, 99, 999, "max"],
) + quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_snapshot_backfill_upstream_wait_progress_latency_bucket')}[$__rate_interval])) by (le, table_id))",
f"snapshot_backfill_upstream_wait_progress_latency_p{legend} table_id[{{{{table_id}}}}]",
),
[50, 90, 99, 999, "max"],
)
),
panels.timeseries_ops(
"Earliest In-Flight Barrier Progress",
Expand All @@ -987,6 +1014,16 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_latency(
"Snapshot Backfill Lag",
"",
[
panels.target(
f"{metric('meta_snapshot_backfill_upstream_lag')} / (2^16) / 1000",
"lag @ {{table_id}}",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ message CombinedMutation {
}

message SubscriptionUpstreamInfo {
// can either be subscription_id or table_id of creating TableFragments
uint32 subscriber_id = 1;
uint32 upstream_mv_table_id = 2;
}
Expand Down Expand Up @@ -543,6 +544,9 @@ enum StreamScanType {

// ArrangementBackfillExecutor
STREAM_SCAN_TYPE_ARRANGEMENT_BACKFILL = 5;

// SnapshotBackfillExecutor
STREAM_SCAN_TYPE_SNAPSHOT_BACKFILL = 6;
}

// StreamScanNode reads data from upstream table first, and then pass all events to downstream.
Expand Down Expand Up @@ -949,6 +953,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048;
}

// The streaming context associated with a stream plan
Expand Down
10 changes: 10 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
// Actors in the partial graphs of the creating jobs that need to be pre-synced the barrier mutation to.
//
// This is required because in snapshot backfill, the snapshot backfill executor receive barriers from
// both local barrier manager and upstream. If we don't pre-sync the barrier mutations, when an input executor
// of an snapshot backfill actor receive a barrier, it will be blocked when trying the fetch the mutation
// of this upstream barrier. The reason for blocking is that, the snapshot backfill have slower progress,
// and therefore won't be synced with the mutation of barrier in upstream. To solve this issue of blocking,
// we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation,
// so that the input executor won't be blocked at waiting for the mutation of upstream barriers.
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;
}

message BarrierCompleteResponse {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub struct SessionConfig {
#[parameter(default = true)]
streaming_use_arrangement_backfill: bool,

#[parameter(default = false)]
streaming_use_snapshot_backfill: bool,

/// Allow `jsonb` in stream key
#[parameter(default = false, rename = "rw_streaming_allow_jsonb_in_stream_key")]
streaming_allow_jsonb_in_stream_key: bool,
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/util/chunk_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ impl DataChunkBuilder {
}
}

pub fn batch_size(&self) -> usize {
self.batch_size
}

/// Lazily create the array builders if absent
fn ensure_builders(&mut self) {
if self.array_builders.len() != self.data_types.len() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,27 @@ async fn read_hummock_snapshot_groups(
})
.collect())
}

#[derive(Fields)]
struct RwHummockTableChangeLog {
#[primary_key]
table_id: i32,
change_log: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_hummock_table_change_log")]
async fn read_hummock_table_change_log(
reader: &SysCatalogReaderImpl,
) -> Result<
Vec<crate::catalog::system_catalog::rw_catalog::rw_hummock_version::RwHummockTableChangeLog>,
> {
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.table_change_log
.iter()
.map(|(table_id, change_log)| RwHummockTableChangeLog {
table_id: table_id.table_id as i32,
change_log: json!(change_log.to_protobuf()).into(),
})
.collect())
}
Loading

0 comments on commit cfea9f3

Please sign in to comment.