Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): decouple barrier collect and sync in global barrier manager #19475

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 13
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end test (parallel, in-memory) (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 20
timeout_in_minutes: 25
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
Expand Down
21 changes: 15 additions & 6 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}

message BarrierCompleteResponse {
message BarrierCollectResponse {
message CreateMviewProgress {
// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
// See <https://github.com/risingwavelabs/risingwave/issues/6236>.
Expand All @@ -34,9 +34,19 @@
uint64 consumed_rows = 4;
uint32 pending_barrier_num = 5;
}
string request_id = 1;
common.Status status = 2;
uint64 partial_graph_id = 1;
// prev_epoch of barrier
uint64 epoch = 2;
repeated CreateMviewProgress create_mview_progress = 3;
}

message BarrierCompleteRequest {
uint64 task_id = 1;
map<uint64, uint64> partial_graph_sync_epochs = 2;
}

message BarrierCompleteResponse {

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "status" on message "BarrierCompleteResponse" was deleted without reserving the name "status".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "create_mview_progress" on message "BarrierCompleteResponse" was deleted without reserving the name "create_mview_progress".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "8" with name "partial_graph_id" on message "BarrierCompleteResponse" was deleted without reserving the name "partial_graph_id".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "9" with name "epoch" on message "BarrierCompleteResponse" was deleted without reserving the name "epoch".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "status" on message "BarrierCompleteResponse" was deleted without reserving the number "2".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "create_mview_progress" on message "BarrierCompleteResponse" was deleted without reserving the number "3".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "8" with name "partial_graph_id" on message "BarrierCompleteResponse" was deleted without reserving the number "8".

Check failure on line 48 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "9" with name "epoch" on message "BarrierCompleteResponse" was deleted without reserving the number "9".
uint64 task_id = 1;

Check failure on line 49 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "task_id" on message "BarrierCompleteResponse" changed option "json_name" from "requestId" to "taskId".

Check failure on line 49 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "task_id" on message "BarrierCompleteResponse" changed type from "string" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
message LocalSstableInfo {
reserved 1;
reserved "compaction_group_id";
Expand All @@ -48,9 +58,6 @@
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint64 partial_graph_id = 8;
// prev_epoch of barrier
uint64 epoch = 9;
}

message WaitEpochCommitRequest {
Expand Down Expand Up @@ -85,6 +92,7 @@
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
CreatePartialGraphRequest create_partial_graph = 4;
BarrierCompleteRequest complete_barrier = 5;
}
}

Expand All @@ -96,6 +104,7 @@
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
ShutdownResponse shutdown = 3;
BarrierCollectResponse collect_barrier = 4;
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ impl MonitorService for MonitorServiceImpl {
Default::default()
};

let mut barrier_traces_next_key = 0;
let barrier_traces_next_key = &mut barrier_traces_next_key;
let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
reg.collect::<BarrierAwait>()
.into_iter()
.map(|(k, v)| (k.prev_epoch, v.to_string()))
.map(|(k, v)| {
let key = *barrier_traces_next_key;
*barrier_traces_next_key += 1;
(key, format!("{:?}", (k.sync_graph_epochs, v.to_string())))
})
.collect()
} else {
Default::default()
Expand Down
Loading
Loading