Skip to content

Commit

Permalink
test(iox-11168): log MemoryReservation resizing
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jun 24, 2024
1 parent e495f1f commit 90b9d2d
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 85 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ mod tests {
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);
reservation.grow("test", 100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();

Expand Down
75 changes: 46 additions & 29 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ impl MemoryConsumer {
/// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
/// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
let root = self.name().into();
pool.register(&self);
MemoryReservation {
registration: Arc::new(SharedRegistration {
pool: Arc::clone(pool),
consumer: self,
}),
size: 0,
root,
}
}
}
Expand Down Expand Up @@ -186,6 +188,7 @@ impl Drop for SharedRegistration {
pub struct MemoryReservation {
registration: Arc<SharedRegistration>,
size: usize,
root: String,
}

impl MemoryReservation {
Expand All @@ -204,7 +207,7 @@ impl MemoryReservation {
pub fn free(&mut self) -> usize {
let size = self.size;
if size != 0 {
self.shrink(size)
self.shrink("MemoryReservation::free", size)
}
size
}
Expand All @@ -214,43 +217,55 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn shrink(&mut self, capacity: usize) {
pub fn shrink(&mut self, caller: &str, capacity: usize) {
let new_size = self.size.checked_sub(capacity).unwrap();
self.registration.pool.shrink(self, capacity);
self.size = new_size
self.size = new_size;
println!(
"MemoryReservation::shrink, {}, {}, {:?}, {:?}",
self.root, caller, capacity, new_size
);
}

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
pub fn resize(&mut self, caller: &str, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
Ordering::Greater => self.grow(caller, capacity - self.size),
Ordering::Less => self.shrink(caller, self.size - capacity),
_ => {}
}
}

/// Try to set the size of this reservation to `capacity`
pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
pub fn try_resize(&mut self, caller: &str, capacity: usize) -> Result<()> {
match capacity.cmp(&self.size) {
Ordering::Greater => self.try_grow(capacity - self.size)?,
Ordering::Less => self.shrink(self.size - capacity),
Ordering::Greater => self.try_grow(caller, capacity - self.size)?,
Ordering::Less => self.shrink(caller, self.size - capacity),
_ => {}
};
Ok(())
}

/// Increase the size of this reservation by `capacity` bytes
pub fn grow(&mut self, capacity: usize) {
pub fn grow(&mut self, caller: &str, capacity: usize) {
self.registration.pool.grow(self, capacity);
self.size += capacity;
println!(
"MemoryReservation::grow, {}, {}, {:?}, {:?}",
self.root, caller, capacity, self.size
);
}

/// Try to increase the size of this reservation by `capacity`
/// bytes, returning error if there is insufficient capacity left
/// in the pool.
pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
pub fn try_grow(&mut self, caller: &str, capacity: usize) -> Result<()> {
self.registration.pool.try_grow(self, capacity)?;
self.size += capacity;
println!(
"MemoryReservation::try_grow, {}, {}, {:?}, {:?}",
self.root, caller, capacity, self.size
);
Ok(())
}

Expand All @@ -264,26 +279,28 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn split(&mut self, capacity: usize) -> MemoryReservation {
pub fn split(&mut self, caller: &str, capacity: usize) -> MemoryReservation {
self.size = self.size.checked_sub(capacity).unwrap();
Self {
size: capacity,
registration: self.registration.clone(),
root: caller.into(),
}
}

/// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn new_empty(&self) -> Self {
pub fn new_empty(&self, caller: &str) -> Self {
Self {
size: 0,
registration: self.registration.clone(),
root: caller.into(),
}
}

/// Splits off all the bytes from this [`MemoryReservation`] into
/// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn take(&mut self) -> MemoryReservation {
self.split(self.size)
pub fn take(&mut self, caller: &str) -> MemoryReservation {
self.split(caller, self.size)
}
}

Expand Down Expand Up @@ -327,26 +344,26 @@ mod tests {
let mut a1 = MemoryConsumer::new("a1").register(&pool);
assert_eq!(pool.reserved(), 0);

a1.grow(100);
a1.grow("test", 100);
assert_eq!(pool.reserved(), 100);

assert_eq!(a1.free(), 100);
assert_eq!(pool.reserved(), 0);

a1.try_grow(100).unwrap_err();
a1.try_grow("test", 100).unwrap_err();
assert_eq!(pool.reserved(), 0);

a1.try_grow(30).unwrap();
a1.try_grow("test", 30).unwrap();
assert_eq!(pool.reserved(), 30);

let mut a2 = MemoryConsumer::new("a2").register(&pool);
a2.try_grow(25).unwrap_err();
a2.try_grow("test", 25).unwrap_err();
assert_eq!(pool.reserved(), 30);

drop(a1);
assert_eq!(pool.reserved(), 0);

a2.try_grow(25).unwrap();
a2.try_grow("test", 25).unwrap();
assert_eq!(pool.reserved(), 25);
}

Expand All @@ -355,12 +372,12 @@ mod tests {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
r1.try_grow("test", 20).unwrap();
assert_eq!(r1.size(), 20);
assert_eq!(pool.reserved(), 20);

// take 5 from r1, should still have same reservation split
let r2 = r1.split(5);
let r2 = r1.split("test", 5);
assert_eq!(r1.size(), 15);
assert_eq!(r2.size(), 5);
assert_eq!(pool.reserved(), 20);
Expand All @@ -376,9 +393,9 @@ mod tests {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.new_empty();
r2.try_grow(5).unwrap();
r1.try_grow("test", 20).unwrap();
let mut r2 = r1.new_empty("test");
r2.try_grow("test", 5).unwrap();

assert_eq!(r1.size(), 20);
assert_eq!(r2.size(), 5);
Expand All @@ -390,16 +407,16 @@ mod tests {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.take();
r2.try_grow(5).unwrap();
r1.try_grow("test", 20).unwrap();
let mut r2 = r1.take("test");
r2.try_grow("test", 5).unwrap();

assert_eq!(r1.size(), 0);
assert_eq!(r2.size(), 25);
assert_eq!(pool.reserved(), 25);

// r1 can still grow again
r1.try_grow(3).unwrap();
r1.try_grow("test", 3).unwrap();
assert_eq!(r1.size(), 3);
assert_eq!(r2.size(), 25);
assert_eq!(pool.reserved(), 28);
Expand Down
26 changes: 13 additions & 13 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,36 +250,36 @@ mod tests {

let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
// Can grow beyond capacity of pool
r1.grow(2000);
r1.grow("test", 2000);
assert_eq!(pool.reserved(), 2000);

let mut r2 = MemoryConsumer::new("r2")
.with_can_spill(true)
.register(&pool);
// Can grow beyond capacity of pool
r2.grow(2000);
r2.grow("test", 2000);

assert_eq!(pool.reserved(), 4000);

let err = r2.try_grow(1).unwrap_err().strip_backtrace();
let err = r2.try_grow("test", 1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");

let err = r2.try_grow(1).unwrap_err().strip_backtrace();
let err = r2.try_grow("test", 1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");

r1.shrink(1990);
r2.shrink(2000);
r1.shrink("test", 1990);
r2.shrink("test", 2000);

assert_eq!(pool.reserved(), 10);

r1.try_grow(10).unwrap();
r1.try_grow("test", 10).unwrap();
assert_eq!(pool.reserved(), 20);

// Can grow r2 to 80 as only spilling consumer
r2.try_grow(80).unwrap();
r2.try_grow("test", 80).unwrap();
assert_eq!(pool.reserved(), 100);

r2.shrink(70);
r2.shrink("test", 70);

assert_eq!(r1.size(), 20);
assert_eq!(r2.size(), 10);
Expand All @@ -289,25 +289,25 @@ mod tests {
.with_can_spill(true)
.register(&pool);

let err = r3.try_grow(70).unwrap_err().strip_backtrace();
let err = r3.try_grow("test", 70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");

//Shrinking r2 to zero doesn't allow a3 to allocate more than 45
r2.free();
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
let err = r3.try_grow("test", 70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");

// But dropping r2 does
drop(r2);
assert_eq!(pool.reserved(), 20);
r3.try_grow(80).unwrap();
r3.try_grow("test", 80).unwrap();

assert_eq!(pool.reserved(), 100);
r1.free();
assert_eq!(pool.reserved(), 80);

let mut r4 = MemoryConsumer::new("s4").register(&pool);
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
let err = r4.try_grow("test", 30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20");
}
}
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ impl AggregateStream {
// allocate memory
// This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
// overshooting a bit. Also this means we either store the whole record batch or not.
match result
.and_then(|allocated| this.reservation.try_grow(allocated))
{
match result.and_then(|allocated| {
this.reservation.try_grow("AggregateStream::new", allocated)
}) {
Ok(_) => continue,
Err(e) => Err(e),
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ impl GroupedHashAggregateStream {
fn update_memory_reservation(&mut self) -> Result<()> {
let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
self.reservation.try_resize(
"GroupedHashAggregateStream::update_memory_reservation",
acc + self.group_values.size()
+ self.group_ordering.size()
+ self.current_group_indices.allocated_size(),
Expand Down Expand Up @@ -763,7 +764,8 @@ impl GroupedHashAggregateStream {
self.baseline_metrics.clone(),
self.batch_size,
None,
self.reservation.new_empty(),
self.reservation
.new_empty("GroupedHashAggregateStream::update_merged_stream"),
)?;
self.input_done = false;
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ async fn load_left_input(
|mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
acc.3
.try_grow("CrossJoinExec::load_left_input", batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,8 @@ async fn collect_left_input(
.try_fold(initial, |mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
acc.3
.try_grow("hash_join::collect_left_input", batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand All @@ -879,7 +880,7 @@ async fn collect_left_input(
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;

reservation.try_grow(estimated_hashtable_size)?;
reservation.try_grow("hash_join::collect_left_input", estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);

let mut hashmap = JoinHashMap::with_capacity(num_rows);
Expand Down Expand Up @@ -909,7 +910,7 @@ async fn collect_left_input(
// Reserve additional memory for visited indices bitmap and create shared builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
reservation.try_grow("hash_join::collect_left_input", bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);

let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ async fn collect_left_input(
|mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
acc.3
.try_grow("nested_loop_join::collect_left_input", batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand All @@ -404,7 +405,7 @@ async fn collect_left_input(
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8);
reservation.try_grow(buffer_size)?;
reservation.try_grow("nested_loop_join::collect_left_input", buffer_size)?;
metrics.build_mem_used.add(buffer_size);

let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows());
Expand Down
Loading

0 comments on commit 90b9d2d

Please sign in to comment.