diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 2cd8f29364eea..35b38eb50da8d 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,9 +108,9 @@ message GetBackPressureRequest { } message BackPressureInfo { - string actor_id = 1; - string fragment_id = 2; - string downstream_fragment_id = 3; + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 downstream_fragment_id = 3; double value = 4; } diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 88966bfa635c1..78572f8f09727 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -256,13 +256,14 @@ impl StreamService for StreamServiceImpl { let mut back_pressure_info = BackPressureInfo::default(); for label_pair in label_pairs.get_label() { if label_pair.get_name() == "actor_id" { - back_pressure_info.actor_id = label_pair.get_value().to_string(); + back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap(); } if label_pair.get_name() == "fragment_id" { - back_pressure_info.fragment_id = label_pair.get_value().to_string(); + back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap(); } if label_pair.get_name() == "downstream_fragment_id" { - back_pressure_info.downstream_fragment_id = label_pair.get_value().to_string(); + back_pressure_info.downstream_fragment_id = + label_pair.get_value().parse::().unwrap(); } } back_pressure_info.value = label_pairs.get_counter().get_value();