diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 25af5235f0a74..0622182c530e4 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -386,7 +386,7 @@ impl AggSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(buf.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.agg_state_writers[partition].write(len_bytes).await?; self.agg_state_writers[partition].write(buf).await?; } @@ -409,7 +409,7 @@ impl AggSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(buf.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.input_writers[partition].write(len_bytes).await?; self.input_writers[partition].write(buf).await?; } @@ -425,7 +425,7 @@ impl AggSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(buf.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.agg_state_writers[partition].write(len_bytes).await?; self.agg_state_writers[partition].write(buf).await?; } @@ -436,7 +436,7 @@ impl AggSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(buf.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.input_writers[partition].write(len_bytes).await?; self.input_writers[partition].write(buf).await?; } diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 7090e840d7c35..db00120fcea5b 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -332,7 +332,7 @@ impl JoinSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(len_bytes.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.probe_side_writers[partition].write(len_bytes).await?; self.probe_side_writers[partition].write(buf).await?; } @@ -360,7 +360,7 @@ impl JoinSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(len_bytes.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.build_side_writers[partition].write(len_bytes).await?; self.build_side_writers[partition].write(buf).await?; } @@ -376,7 +376,7 @@ impl JoinSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(len_bytes.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.probe_side_writers[partition].write(len_bytes).await?; self.probe_side_writers[partition].write(buf).await?; } @@ -387,7 +387,7 @@ impl JoinSpillManager { let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes()); self.spill_metrics .batch_spill_write_bytes - .inc_by(len_bytes.len() as u64 + 4); + .inc_by((buf.len() + len_bytes.len()) as u64); self.build_side_writers[partition].write(len_bytes).await?; self.build_side_writers[partition].write(buf).await?; }