Skip to content

Commit

Permalink
fix(streaming): use correct label for `stream_fragment_exchange_bytes…
Browse files Browse the repository at this point in the history
…` metrics (#13644)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Nov 24, 2023
1 parent 3ccb249 commit 2348a2b
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
@@ -83,7 +83,12 @@ impl ExchangeService for ExchangeServiceImpl {
let mut request_stream: Streaming<GetStreamRequest> = request.into_inner();

// Extract the first `Get` request from the stream.
let get_req = {
let Get {
up_actor_id,
down_actor_id,
up_fragment_id,
down_fragment_id,
} = {
let req = request_stream
.next()
.await
@@ -94,8 +99,10 @@ impl ExchangeService for ExchangeServiceImpl {
}
};

let up_down_actor_ids = (get_req.up_actor_id, get_req.down_actor_id);
let receiver = self.stream_mgr.take_receiver(up_down_actor_ids).await?;
let receiver = self
.stream_mgr
.take_receiver((up_actor_id, down_actor_id))
.await?;

// Map the remaining stream to add-permits.
let add_permits_stream = request_stream.map_ok(|req| match req.value.unwrap() {
@@ -108,7 +115,7 @@ impl ExchangeService for ExchangeServiceImpl {
peer_addr,
receiver,
add_permits_stream,
up_down_actor_ids,
(up_fragment_id, down_fragment_id),
)))
}
}

0 comments on commit 2348a2b

Please sign in to comment.