diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index e247263964cd..0e3024860337 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1497,7 +1497,7 @@ mod tests { let memory_pool = ctx1.runtime_env().memory_pool.clone(); let mut reservation = MemoryConsumer::new("test").register(&memory_pool); - reservation.grow(100); + reservation.grow("test", 100); let disk_manager = ctx1.runtime_env().disk_manager.clone(); diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 321a89127be7..38ef49fcc0c4 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -150,6 +150,7 @@ impl MemoryConsumer { /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation pub fn register(self, pool: &Arc) -> MemoryReservation { + let root = self.name().into(); pool.register(&self); MemoryReservation { registration: Arc::new(SharedRegistration { @@ -157,6 +158,7 @@ impl MemoryConsumer { consumer: self, }), size: 0, + root, } } } @@ -186,6 +188,7 @@ impl Drop for SharedRegistration { pub struct MemoryReservation { registration: Arc, size: usize, + root: String, } impl MemoryReservation { @@ -204,7 +207,7 @@ impl MemoryReservation { pub fn free(&mut self) -> usize { let size = self.size; if size != 0 { - self.shrink(size) + self.shrink("MemoryReservation::free", size) } size } @@ -214,43 +217,55 @@ impl MemoryReservation { /// # Panics /// /// Panics if `capacity` exceeds [`Self::size`] - pub fn shrink(&mut self, capacity: usize) { + pub fn shrink(&mut self, caller: &str, capacity: usize) { let new_size = self.size.checked_sub(capacity).unwrap(); self.registration.pool.shrink(self, capacity); - self.size = new_size + self.size = new_size; + println!( + "MemoryReservation::shrink, {}, {}, {:?}, {:?}", + self.root, caller, capacity, new_size + ); } /// Sets the size of this reservation to `capacity` - pub fn resize(&mut self, capacity: usize) { + pub fn resize(&mut self, caller: &str, capacity: usize) { match capacity.cmp(&self.size) { - Ordering::Greater => self.grow(capacity - self.size), - Ordering::Less => self.shrink(self.size - capacity), + Ordering::Greater => self.grow(caller, capacity - self.size), + Ordering::Less => self.shrink(caller, self.size - capacity), _ => {} } } /// Try to set the size of this reservation to `capacity` - pub fn try_resize(&mut self, capacity: usize) -> Result<()> { + pub fn try_resize(&mut self, caller: &str, capacity: usize) -> Result<()> { match capacity.cmp(&self.size) { - Ordering::Greater => self.try_grow(capacity - self.size)?, - Ordering::Less => self.shrink(self.size - capacity), + Ordering::Greater => self.try_grow(caller, capacity - self.size)?, + Ordering::Less => self.shrink(caller, self.size - capacity), _ => {} }; Ok(()) } /// Increase the size of this reservation by `capacity` bytes - pub fn grow(&mut self, capacity: usize) { + pub fn grow(&mut self, caller: &str, capacity: usize) { self.registration.pool.grow(self, capacity); self.size += capacity; + println!( + "MemoryReservation::grow, {}, {}, {:?}, {:?}", + self.root, caller, capacity, self.size + ); } /// Try to increase the size of this reservation by `capacity` /// bytes, returning error if there is insufficient capacity left /// in the pool. - pub fn try_grow(&mut self, capacity: usize) -> Result<()> { + pub fn try_grow(&mut self, caller: &str, capacity: usize) -> Result<()> { self.registration.pool.try_grow(self, capacity)?; self.size += capacity; + println!( + "MemoryReservation::try_grow, {}, {}, {:?}, {:?}", + self.root, caller, capacity, self.size + ); Ok(()) } @@ -264,26 +279,28 @@ impl MemoryReservation { /// # Panics /// /// Panics if `capacity` exceeds [`Self::size`] - pub fn split(&mut self, capacity: usize) -> MemoryReservation { + pub fn split(&mut self, caller: &str, capacity: usize) -> MemoryReservation { self.size = self.size.checked_sub(capacity).unwrap(); Self { size: capacity, registration: self.registration.clone(), + root: caller.into(), } } /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`] - pub fn new_empty(&self) -> Self { + pub fn new_empty(&self, caller: &str) -> Self { Self { size: 0, registration: self.registration.clone(), + root: caller.into(), } } /// Splits off all the bytes from this [`MemoryReservation`] into /// a new [`MemoryReservation`] with the same [`MemoryConsumer`] - pub fn take(&mut self) -> MemoryReservation { - self.split(self.size) + pub fn take(&mut self, caller: &str) -> MemoryReservation { + self.split(caller, self.size) } } @@ -327,26 +344,26 @@ mod tests { let mut a1 = MemoryConsumer::new("a1").register(&pool); assert_eq!(pool.reserved(), 0); - a1.grow(100); + a1.grow("test", 100); assert_eq!(pool.reserved(), 100); assert_eq!(a1.free(), 100); assert_eq!(pool.reserved(), 0); - a1.try_grow(100).unwrap_err(); + a1.try_grow("test", 100).unwrap_err(); assert_eq!(pool.reserved(), 0); - a1.try_grow(30).unwrap(); + a1.try_grow("test", 30).unwrap(); assert_eq!(pool.reserved(), 30); let mut a2 = MemoryConsumer::new("a2").register(&pool); - a2.try_grow(25).unwrap_err(); + a2.try_grow("test", 25).unwrap_err(); assert_eq!(pool.reserved(), 30); drop(a1); assert_eq!(pool.reserved(), 0); - a2.try_grow(25).unwrap(); + a2.try_grow("test", 25).unwrap(); assert_eq!(pool.reserved(), 25); } @@ -355,12 +372,12 @@ mod tests { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; let mut r1 = MemoryConsumer::new("r1").register(&pool); - r1.try_grow(20).unwrap(); + r1.try_grow("test", 20).unwrap(); assert_eq!(r1.size(), 20); assert_eq!(pool.reserved(), 20); // take 5 from r1, should still have same reservation split - let r2 = r1.split(5); + let r2 = r1.split("test", 5); assert_eq!(r1.size(), 15); assert_eq!(r2.size(), 5); assert_eq!(pool.reserved(), 20); @@ -376,9 +393,9 @@ mod tests { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; let mut r1 = MemoryConsumer::new("r1").register(&pool); - r1.try_grow(20).unwrap(); - let mut r2 = r1.new_empty(); - r2.try_grow(5).unwrap(); + r1.try_grow("test", 20).unwrap(); + let mut r2 = r1.new_empty("test"); + r2.try_grow("test", 5).unwrap(); assert_eq!(r1.size(), 20); assert_eq!(r2.size(), 5); @@ -390,16 +407,16 @@ mod tests { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; let mut r1 = MemoryConsumer::new("r1").register(&pool); - r1.try_grow(20).unwrap(); - let mut r2 = r1.take(); - r2.try_grow(5).unwrap(); + r1.try_grow("test", 20).unwrap(); + let mut r2 = r1.take("test"); + r2.try_grow("test", 5).unwrap(); assert_eq!(r1.size(), 0); assert_eq!(r2.size(), 25); assert_eq!(pool.reserved(), 25); // r1 can still grow again - r1.try_grow(3).unwrap(); + r1.try_grow("test", 3).unwrap(); assert_eq!(r1.size(), 3); assert_eq!(r2.size(), 25); assert_eq!(pool.reserved(), 28); diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 4a491630fe20..93efa0882021 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -250,36 +250,36 @@ mod tests { let mut r1 = MemoryConsumer::new("unspillable").register(&pool); // Can grow beyond capacity of pool - r1.grow(2000); + r1.grow("test", 2000); assert_eq!(pool.reserved(), 2000); let mut r2 = MemoryConsumer::new("r2") .with_can_spill(true) .register(&pool); // Can grow beyond capacity of pool - r2.grow(2000); + r2.grow("test", 2000); assert_eq!(pool.reserved(), 4000); - let err = r2.try_grow(1).unwrap_err().strip_backtrace(); + let err = r2.try_grow("test", 1).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); - let err = r2.try_grow(1).unwrap_err().strip_backtrace(); + let err = r2.try_grow("test", 1).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); - r1.shrink(1990); - r2.shrink(2000); + r1.shrink("test", 1990); + r2.shrink("test", 2000); assert_eq!(pool.reserved(), 10); - r1.try_grow(10).unwrap(); + r1.try_grow("test", 10).unwrap(); assert_eq!(pool.reserved(), 20); // Can grow r2 to 80 as only spilling consumer - r2.try_grow(80).unwrap(); + r2.try_grow("test", 80).unwrap(); assert_eq!(pool.reserved(), 100); - r2.shrink(70); + r2.shrink("test", 70); assert_eq!(r1.size(), 20); assert_eq!(r2.size(), 10); @@ -289,25 +289,25 @@ mod tests { .with_can_spill(true) .register(&pool); - let err = r3.try_grow(70).unwrap_err().strip_backtrace(); + let err = r3.try_grow("test", 70).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); - let err = r3.try_grow(70).unwrap_err().strip_backtrace(); + let err = r3.try_grow("test", 70).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); // But dropping r2 does drop(r2); assert_eq!(pool.reserved(), 20); - r3.try_grow(80).unwrap(); + r3.try_grow("test", 80).unwrap(); assert_eq!(pool.reserved(), 100); r1.free(); assert_eq!(pool.reserved(), 80); let mut r4 = MemoryConsumer::new("s4").register(&pool); - let err = r4.try_grow(30).unwrap_err().strip_backtrace(); + let err = r4.try_grow("test", 30).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 5ec95bd79942..d37512d6ea17 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -126,9 +126,9 @@ impl AggregateStream { // allocate memory // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with // overshooting a bit. Also this means we either store the whole record batch or not. - match result - .and_then(|allocated| this.reservation.try_grow(allocated)) - { + match result.and_then(|allocated| { + this.reservation.try_grow("AggregateStream::new", allocated) + }) { Ok(_) => continue, Err(e) => Err(e), } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index ad0860b93a3a..7789b81bd753 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -615,6 +615,7 @@ impl GroupedHashAggregateStream { fn update_memory_reservation(&mut self) -> Result<()> { let acc = self.accumulators.iter().map(|x| x.size()).sum::(); self.reservation.try_resize( + "GroupedHashAggregateStream::update_memory_reservation", acc + self.group_values.size() + self.group_ordering.size() + self.current_group_indices.allocated_size(), @@ -763,7 +764,8 @@ impl GroupedHashAggregateStream { self.baseline_metrics.clone(), self.batch_size, None, - self.reservation.new_empty(), + self.reservation + .new_empty("GroupedHashAggregateStream::update_merged_stream"), )?; self.input_done = false; self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 92443d06856a..d95f8248bd94 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -161,7 +161,8 @@ async fn load_left_input( |mut acc, batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.3 + .try_grow("CrossJoinExec::load_left_input", batch_size)?; // Update metrics acc.2.build_mem_used.add(batch_size); acc.2.build_input_batches.add(1); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 784584f03f0f..a0041e96c572 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -860,7 +860,8 @@ async fn collect_left_input( .try_fold(initial, |mut acc, batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.3 + .try_grow("hash_join::collect_left_input", batch_size)?; // Update metrics acc.2.build_mem_used.add(batch_size); acc.2.build_input_batches.add(1); @@ -879,7 +880,7 @@ async fn collect_left_input( let estimated_hashtable_size = estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?; - reservation.try_grow(estimated_hashtable_size)?; + reservation.try_grow("hash_join::collect_left_input", estimated_hashtable_size)?; metrics.build_mem_used.add(estimated_hashtable_size); let mut hashmap = JoinHashMap::with_capacity(num_rows); @@ -909,7 +910,7 @@ async fn collect_left_input( // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); - reservation.try_grow(bitmap_size)?; + reservation.try_grow("hash_join::collect_left_input", bitmap_size)?; metrics.build_mem_used.add(bitmap_size); let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows()); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 18518600ef2f..d32e2559991f 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -383,7 +383,8 @@ async fn collect_left_input( |mut acc, batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.3 + .try_grow("nested_loop_join::collect_left_input", batch_size)?; // Update metrics acc.2.build_mem_used.add(batch_size); acc.2.build_input_batches.add(1); @@ -404,7 +405,7 @@ async fn collect_left_input( // TODO: Replace `ceil` wrapper with stable `div_cell` after // https://github.com/rust-lang/rust/issues/88581 let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8); - reservation.try_grow(buffer_size)?; + reservation.try_grow("nested_loop_join::collect_left_input", buffer_size)?; metrics.build_mem_used.add(buffer_size); let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows()); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 8da345cdfca6..44a5500dc4e8 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -857,7 +857,10 @@ impl SMJStream { if let Some(buffered_batch) = self.buffered_data.batches.pop_front() { - self.reservation.shrink(buffered_batch.size_estimation); + self.reservation.shrink( + "sort_merge_join::SMJStream::poll_buffered_batches", + buffered_batch.size_estimation, + ); } } else { break; @@ -886,7 +889,10 @@ impl SMJStream { if batch.num_rows() > 0 { let buffered_batch = BufferedBatch::new(batch, 0..1, &self.on_buffered); - self.reservation.try_grow(buffered_batch.size_estimation)?; + self.reservation.try_grow( + "sort_merge_join::SMJStream::poll_buffered_batches", + buffered_batch.size_estimation, + )?; self.join_metrics .peak_mem_used .set_max(self.reservation.size()); @@ -933,7 +939,7 @@ impl SMJStream { &self.on_buffered, ); self.reservation - .try_grow(buffered_batch.size_estimation)?; + .try_grow("sort_merge_join::SMJStream::poll_buffered_batches", buffered_batch.size_estimation)?; self.join_metrics .peak_mem_used .set_max(self.reservation.size()); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 449c42d69797..c16f4a381a44 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -504,7 +504,9 @@ impl ExecutionPlan for SymmetricHashJoinExec { .register(context.memory_pool()), )); if let Some(g) = graph.as_ref() { - reservation.lock().try_grow(g.size())?; + reservation + .lock() + .try_grow("symmetric_hash_join::execute", g.size())?; } Ok(Box::pin(SymmetricHashJoinStream { @@ -1529,7 +1531,11 @@ impl SymmetricHashJoinStream { let result = combine_two_batches(&self.schema, equal_result, anti_result)?; let capacity = self.size(); self.metrics.stream_memory_usage.set(capacity); - self.reservation.lock().try_resize(capacity)?; + self.reservation.lock().try_resize( + "SymmetricHashJoinStream::perform_join_for_given_side", + capacity, + )?; + // Update the metrics if we have a batch; otherwise, continue the loop. if let Some(batch) = &result { self.metrics.output_batches.add(1); diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 9a0b66caba31..211f66e35d6c 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -278,7 +278,10 @@ impl RecursiveQueryStream { mut self: std::pin::Pin<&mut Self>, batch: RecordBatch, ) -> Poll>> { - if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) { + if let Err(e) = self + .reservation + .try_grow("recursive_query::push_batch", batch.get_array_memory_size()) + { return Poll::Ready(Some(Err(e))); } @@ -305,7 +308,8 @@ impl RecursiveQueryStream { // Update the work table with the current buffer let reserved_batches = ReservedBatches::new( std::mem::take(&mut self.buffer), - self.reservation.take(), + self.reservation + .take("recursive_query::poll_next_iteration"), ); self.work_table.update(reserved_batches); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 65f7d5070a5d..9da4e9224ac4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -792,11 +792,15 @@ impl RepartitionExec { let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it if let Some((tx, reservation)) = output_channels.get_mut(&partition) { - reservation.lock().try_grow(size)?; + reservation + .lock() + .try_grow("RepartitionExec::pull_from_input", size)?; if tx.send(Some(Ok(batch))).await.is_err() { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) - reservation.lock().shrink(size); + reservation + .lock() + .shrink("RepartitionExec::pull_from_input", size); output_channels.remove(&partition); } } @@ -908,9 +912,10 @@ impl Stream for RepartitionStream { match self.input.recv().poll_unpin(cx) { Poll::Ready(Some(Some(v))) => { if let Ok(batch) = &v { - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); + self.reservation.lock().shrink( + "repartition::poll_next", + batch.get_array_memory_size(), + ); } return Poll::Ready(Some(v)); @@ -973,7 +978,7 @@ impl Stream for PerPartitionStream { if let Ok(batch) = &v { self.reservation .lock() - .shrink(batch.get_array_memory_size()); + .shrink("repartition::poll_next", batch.get_array_memory_size()); } Poll::Ready(Some(v)) } diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 3527d5738223..b7225b96d3d6 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -68,7 +68,8 @@ impl BatchBuilder { /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + self.reservation + .try_grow("sorts::BatchBuilder", batch.get_array_memory_size())?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { @@ -140,7 +141,8 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(batch.get_array_memory_size()); + self.reservation + .shrink("sorts::BatchBuilder", batch.get_array_memory_size()); } retain }); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2a4862534590..121233ad60f0 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -291,7 +291,11 @@ impl ExternalSorter { self.reserve_memory_for_merge()?; let size = input.get_array_memory_size(); - if self.reservation.try_grow(size).is_err() { + if self + .reservation + .try_grow("ExternalSorter::insert_batch", size) + .is_err() + { let before = self.reservation.size(); self.in_mem_sort().await?; // Sorting may have freed memory, especially if fetch is `Some` @@ -304,10 +308,14 @@ impl ExternalSorter { // memory required for `fetch` is just under the memory available, // causing repeated re-sorting of data if self.reservation.size() > before / 2 - || self.reservation.try_grow(size).is_err() + || self + .reservation + .try_grow("ExternalSorter::insert_batch", size) + .is_err() { self.spill().await?; - self.reservation.try_grow(size)? + self.reservation + .try_grow("ExternalSorter::insert_batch", size)? } } @@ -356,7 +364,7 @@ impl ExternalSorter { self.metrics.baseline.clone(), self.batch_size, self.fetch, - self.reservation.new_empty(), + self.reservation.new_empty("ExternalSorter::sort"), ) } else if !self.in_mem_batches.is_empty() { self.in_mem_sort_stream(self.metrics.baseline.clone()) @@ -436,7 +444,8 @@ impl ExternalSorter { // Reserve headroom for next sort/merge self.reserve_memory_for_merge()?; - self.reservation.try_resize(size)?; + self.reservation + .try_resize("ExternalSorter::in_mem_sort", size)?; self.in_mem_batches_sorted = true; Ok(()) } @@ -506,7 +515,7 @@ impl ExternalSorter { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.remove(0); - let reservation = self.reservation.take(); + let reservation = self.reservation.take("ExternalSorter::in_mem_sort_stream"); return self.sort_batch_stream(batch, metrics, reservation); } @@ -515,8 +524,11 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - self.reservation.try_resize(batch.get_array_memory_size())?; - let reservation = self.reservation.take(); + self.reservation.try_resize( + "ExternalSorter::in_mem_sort_stream", + batch.get_array_memory_size(), + )?; + let reservation = self.reservation.take("ExternalSorter::in_mem_sort_stream"); return self.sort_batch_stream(batch, metrics, reservation); } @@ -524,7 +536,10 @@ impl ExternalSorter { .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - let reservation = self.reservation.split(batch.get_array_memory_size()); + let reservation = self.reservation.split( + "ExternalSorter::in_mem_sort_stream", + batch.get_array_memory_size(), + ); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -537,7 +552,8 @@ impl ExternalSorter { metrics, self.batch_size, self.fetch, - self.merge_reservation.new_empty(), + self.merge_reservation + .new_empty("ExternalSorter::in_mem_sort_stream"), ) } @@ -574,7 +590,8 @@ impl ExternalSorter { if self.runtime.disk_manager.tmp_files_enabled() { let size = self.sort_spill_reservation_bytes; if self.merge_reservation.size() != size { - self.merge_reservation.try_resize(size)?; + self.merge_reservation + .try_resize("ExternalSorter::reserve_memory_for_merge", size)?; } } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 135b4fbdece4..be8a77207473 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -122,11 +122,13 @@ impl RowCursorStream { .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; - self.reservation.try_resize(self.converter.size())?; + self.reservation + .try_resize("RowCursorStream::convert_batch", self.converter.size())?; // track the memory in the newly created Rows. - let mut rows_reservation = self.reservation.new_empty(); - rows_reservation.try_grow(rows.size())?; + let mut rows_reservation = + self.reservation.new_empty("RowCursorStream::convert_batch"); + rows_reservation.try_grow("RowCursorStream::convert_batch", rows.size())?; Ok(RowValues::new(rows, rows_reservation)) } } diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 9e6618dd1af5..8a2d1eccbd8e 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -83,7 +83,7 @@ pub fn streaming_merge( schema.as_ref(), expressions, streams, - reservation.new_empty(), + reservation.new_empty("sorts::streaming_merge"), )?; Ok(Box::pin(SortPreservingMergeStream::new( diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 6a77bfaf3ccd..401740f99922 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -184,7 +184,8 @@ impl TopK { self.heap.maybe_compact()?; // update memory reservation - self.reservation.try_resize(self.size())?; + self.reservation + .try_resize("TopK::insert_batch", self.size())?; Ok(()) } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 003957947fec..50c32f19458e 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -234,7 +234,7 @@ mod tests { // update batch to work_table let array: ArrayRef = Arc::new((0..5).collect::()); let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap(); - reservation.try_grow(100).unwrap(); + reservation.try_grow("test", 100).unwrap(); work_table.update(ReservedBatches::new(vec![batch.clone()], reservation)); // take from work_table let reserved_batches = work_table.take().unwrap();