diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index 6225cef2a7e30..955eaa729cdb5 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -83,7 +83,12 @@ impl ExchangeService for ExchangeServiceImpl { let mut request_stream: Streaming = request.into_inner(); // Extract the first `Get` request from the stream. - let get_req = { + let Get { + up_actor_id, + down_actor_id, + up_fragment_id, + down_fragment_id, + } = { let req = request_stream .next() .await @@ -94,8 +99,10 @@ impl ExchangeService for ExchangeServiceImpl { } }; - let up_down_actor_ids = (get_req.up_actor_id, get_req.down_actor_id); - let receiver = self.stream_mgr.take_receiver(up_down_actor_ids).await?; + let receiver = self + .stream_mgr + .take_receiver((up_actor_id, down_actor_id)) + .await?; // Map the remaining stream to add-permits. let add_permits_stream = request_stream.map_ok(|req| match req.value.unwrap() { @@ -108,7 +115,7 @@ impl ExchangeService for ExchangeServiceImpl { peer_addr, receiver, add_permits_stream, - up_down_actor_ids, + (up_fragment_id, down_fragment_id), ))) } }