Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jun 4, 2024
1 parent c3ab0e2 commit efb0947
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl AggSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.agg_state_writers[partition].write(len_bytes).await?;
self.agg_state_writers[partition].write(buf).await?;
}
Expand All @@ -409,7 +409,7 @@ impl AggSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.input_writers[partition].write(len_bytes).await?;
self.input_writers[partition].write(buf).await?;
}
Expand All @@ -425,7 +425,7 @@ impl AggSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.agg_state_writers[partition].write(len_bytes).await?;
self.agg_state_writers[partition].write(buf).await?;
}
Expand All @@ -436,7 +436,7 @@ impl AggSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.input_writers[partition].write(len_bytes).await?;
self.input_writers[partition].write(buf).await?;
}
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl JoinSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(len_bytes.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.probe_side_writers[partition].write(len_bytes).await?;
self.probe_side_writers[partition].write(buf).await?;
}
Expand Down Expand Up @@ -360,7 +360,7 @@ impl JoinSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(len_bytes.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.build_side_writers[partition].write(len_bytes).await?;
self.build_side_writers[partition].write(buf).await?;
}
Expand All @@ -376,7 +376,7 @@ impl JoinSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(len_bytes.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.probe_side_writers[partition].write(len_bytes).await?;
self.probe_side_writers[partition].write(buf).await?;
}
Expand All @@ -387,7 +387,7 @@ impl JoinSpillManager {
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(len_bytes.len() as u64 + 4);
.inc_by((buf.len() + len_bytes.len()) as u64);
self.build_side_writers[partition].write(len_bytes).await?;
self.build_side_writers[partition].write(buf).await?;
}
Expand Down

0 comments on commit efb0947

Please sign in to comment.