diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index ad1f51cf1dc3..e152956714f8 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -28,9 +28,6 @@ use crate::memtable::BoxedBatchIterator; use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; -/// Minimum batch size to output. -const MIN_BATCH_SIZE: usize = 64; - /// Reader to merge sorted batches. /// /// The merge reader merges [Batch]es from multiple sources that yield sorted batches. @@ -49,11 +46,8 @@ pub struct MergeReader { /// /// `Node` in this heap **must** not be empty. cold: BinaryHeap, - /// Batches to output. - batch_merger: BatchMerger, - /// Suggested size of each batch. The batch returned by the reader can have more rows than the - /// batch size. - batch_size: usize, + /// Batch to output. + output_batch: Option, /// Local metrics. metrics: Metrics, } @@ -62,15 +56,7 @@ pub struct MergeReader { impl BatchReader for MergeReader { async fn next_batch(&mut self) -> Result> { let start = Instant::now(); - while !self.hot.is_empty() && self.batch_merger.num_rows() < self.batch_size { - if let Some(current_key) = self.batch_merger.primary_key() { - // If the hottest node has a different key, we have finish collecting current key. - // Safety: hot is not empty. - if self.hot.peek().unwrap().primary_key() != current_key { - break; - } - } - + while !self.hot.is_empty() && self.output_batch.is_none() { if self.hot.len() == 1 { // No need to do merge sort if only one batch in the hot heap. self.fetch_batch_from_hottest().await?; @@ -82,17 +68,14 @@ impl BatchReader for MergeReader { } } - if self.batch_merger.is_empty() { - // Nothing fetched. + if let Some(batch) = self.output_batch.take() { self.metrics.scan_cost += start.elapsed(); - // Update deleted rows num. - self.metrics.num_deleted_rows = self.batch_merger.num_deleted_rows(); - Ok(None) + self.metrics.num_output_rows += batch.num_rows(); + Ok(Some(batch)) } else { - let batch = self.batch_merger.merge_batches()?; + // Nothing fetched. self.metrics.scan_cost += start.elapsed(); - self.metrics.num_output_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); - Ok(batch) + Ok(None) } } } @@ -115,7 +98,7 @@ impl Drop for MergeReader { impl MergeReader { /// Creates and initializes a new [MergeReader]. - pub async fn new(sources: Vec, batch_size: usize) -> Result { + pub async fn new(sources: Vec) -> Result { let start = Instant::now(); let mut metrics = Metrics::default(); @@ -132,8 +115,7 @@ impl MergeReader { let mut reader = MergeReader { hot, cold, - batch_merger: BatchMerger::new(), - batch_size, + output_batch: None, metrics, }; // Initializes the reader. @@ -168,7 +150,7 @@ impl MergeReader { let mut hottest = self.hot.pop().unwrap(); let batch = hottest.fetch_batch(&mut self.metrics).await?; - self.batch_merger.push(batch)?; + Self::maybe_output_batch(batch, &mut self.output_batch, &mut self.metrics)?; self.reheap(hottest) } @@ -199,7 +181,11 @@ impl MergeReader { // They have duplicate timestamps. Outputs timestamps before the duplicated timestamp. // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` // must be less than `next_min_ts`. - self.batch_merger.push(top.slice(0, pos))?; + Self::maybe_output_batch( + top.slice(0, pos), + &mut self.output_batch, + &mut self.metrics, + )?; // This keep the duplicate timestamp in the node. top_node.skip_rows(pos, &mut self.metrics).await?; // The merge window should contain this timestamp so only nodes in the hot heap @@ -209,7 +195,11 @@ impl MergeReader { } Err(pos) => { // No duplicate timestamp. Outputs timestamp before `pos`. - self.batch_merger.push(top.slice(0, pos))?; + Self::maybe_output_batch( + top.slice(0, pos), + &mut self.output_batch, + &mut self.metrics, + )?; top_node.skip_rows(pos, &mut self.metrics).await?; self.reheap(top_node)?; } @@ -300,16 +290,37 @@ impl MergeReader { Ok(()) } + + /// Removeds deleted entries and sets the `batch` to the `output_batch`. + /// + /// Ignores the `batch` if it is empty. + fn maybe_output_batch( + mut batch: Batch, + output_batch: &mut Option, + metrics: &mut Metrics, + ) -> Result<()> { + debug_assert!(output_batch.is_none()); + + let num_rows = batch.num_rows(); + batch.filter_deleted()?; + // Update deleted rows metrics. + metrics.num_deleted_rows += num_rows - batch.num_rows(); + if batch.is_empty() { + return Ok(()); + } + *output_batch = Some(batch); + + Ok(()) + } } /// Builder to build and initialize a [MergeReader]. +#[derive(Default)] pub struct MergeReaderBuilder { /// Input sources. /// /// All source must yield batches with the same schema. sources: Vec, - /// Batch size of the reader. - batch_size: usize, } impl MergeReaderBuilder { @@ -330,25 +341,10 @@ impl MergeReaderBuilder { self } - /// Sets the batch size of the reader. - pub fn batch_size(&mut self, size: usize) -> &mut Self { - self.batch_size = if size == 0 { MIN_BATCH_SIZE } else { size }; - self - } - /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources, self.batch_size).await - } -} - -impl Default for MergeReaderBuilder { - fn default() -> Self { - MergeReaderBuilder { - sources: Vec::new(), - batch_size: MIN_BATCH_SIZE, - } + MergeReader::new(sources).await } } @@ -371,89 +367,6 @@ struct Metrics { num_deleted_rows: usize, } -/// Helper to collect and merge small batches for same primary key. -struct BatchMerger { - /// Buffered non-empty batches to merge. - batches: Vec, - /// Number of rows in the batch. - num_rows: usize, - /// Number of rows deleted. - num_deleted_rows: usize, -} - -impl BatchMerger { - /// Returns a empty merger. - fn new() -> BatchMerger { - BatchMerger { - batches: Vec::new(), - num_rows: 0, - num_deleted_rows: 0, - } - } - - /// Returns the number of rows. - fn num_rows(&self) -> usize { - self.num_rows - } - - /// Returns the number of rows deleted. - fn num_deleted_rows(&self) -> usize { - self.num_deleted_rows - } - - /// Returns true if the merger is empty. - fn is_empty(&self) -> bool { - self.num_rows() == 0 - } - - /// Returns the primary key of current merger and `None` if the merger is empty. - fn primary_key(&self) -> Option<&[u8]> { - self.batches.first().map(|batch| batch.primary_key()) - } - - /// Removeds deleted entries and pushes a `batch` into the merger. - /// - /// Ignores the `batch` if it is empty. - /// - /// # Panics - /// Panics if the `batch` has another primary key. - fn push(&mut self, mut batch: Batch) -> Result<()> { - debug_assert!(self - .batches - .last() - .map(|b| b.primary_key() == batch.primary_key()) - .unwrap_or(true)); - - let num_rows = batch.num_rows(); - batch.filter_deleted()?; - self.num_deleted_rows += num_rows - batch.num_rows(); - if batch.is_empty() { - return Ok(()); - } - - self.num_rows += batch.num_rows(); - self.batches.push(batch); - - Ok(()) - } - - /// Merge all buffered batches and returns the merged batch. Then - /// reset the buffer. - fn merge_batches(&mut self) -> Result> { - if self.batches.is_empty() { - return Ok(None); - } - - // Reset number of rows. - self.num_rows = 0; - if self.batches.len() == 1 { - return Ok(self.batches.pop()); - } - let batches = mem::take(&mut self.batches); - Batch::concat(batches).map(Some) - } -} - /// A `Node` represent an individual input data source to be merged. struct Node { /// Data source of this `Node`. @@ -669,17 +582,19 @@ mod tests { &[ new_batch( b"k1", - &[1, 2, 4, 5, 7], - &[11, 12, 14, 15, 17], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[21, 22, 24, 25, 27], + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], ), + new_batch( + b"k1", + &[4, 5], + &[14, 15], + &[OpType::Put, OpType::Put], + &[24, 25], + ), + new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]), new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), ], ) @@ -718,13 +633,10 @@ mod tests { check_reader_result( &mut reader, &[ - new_batch( - b"k1", - &[1, 2, 3, 4], - &[10, 11, 10, 11], - &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], - &[21, 32, 23, 34], - ), + new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), + new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), + new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]), + new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]), new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]), ], ) @@ -785,18 +697,16 @@ mod tests { &[ new_batch( b"k1", - &[1, 2, 3, 4], - &[11, 12, 10, 14], - &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], - &[21, 22, 33, 24], - ), - new_batch( - b"k2", - &[1, 3, 10], - &[11, 13, 20], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 23, 30], + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], ), + new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]), + new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]), + new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]), + new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]), ], ) .await; @@ -900,13 +810,16 @@ mod tests { .unwrap(); check_reader_result( &mut reader, - &[new_batch( - b"k1", - &[1, 2, 3], - &[10, 11, 11], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 32, 33], - )], + &[ + new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), + new_batch( + b"k1", + &[2, 3], + &[11, 11], + &[OpType::Put, OpType::Put], + &[32, 33], + ), + ], ) .await; } @@ -945,19 +858,18 @@ mod tests { .unwrap(); check_reader_result( &mut reader, - &[new_batch( - b"k1", - &[1, 6, 8, 10, 20], - &[11, 11, 11, 10, 11], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[31, 36, 38, 30, 40], - )], + &[ + new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]), + new_batch( + b"k1", + &[6, 8], + &[11, 11], + &[OpType::Put, OpType::Put], + &[36, 38], + ), + new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]), + new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]), + ], ) .await; } @@ -965,7 +877,6 @@ mod tests { #[tokio::test] async fn test_merge_many_duplicates() { let mut builder = MergeReaderBuilder::new(); - builder.batch_size(3); for i in 0..10 { let batches: Vec<_> = (0..8) .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100])) @@ -974,184 +885,9 @@ mod tests { builder.push_batch_reader(Box::new(reader)); } let mut reader = builder.build().await.unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[0, 1, 2], - &[9, 9, 9], - &[OpType::Put, OpType::Put, OpType::Put], - &[100, 100, 100], - ), - new_batch( - b"k1", - &[3, 4, 5], - &[9, 9, 9], - &[OpType::Put, OpType::Put, OpType::Put], - &[100, 100, 100], - ), - new_batch( - b"k1", - &[6, 7], - &[9, 9], - &[OpType::Put, OpType::Put], - &[100, 100], - ), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_more_than_batch_size() { - let batches: Vec<_> = (0..MIN_BATCH_SIZE as i64 * 2) - .map(|ts| new_batch(b"k1", &[ts], &[10], &[OpType::Put], &[100])) + let expect: Vec<_> = (0..8) + .map(|ts| new_batch(b"k1", &[ts], &[9], &[OpType::Put], &[100])) .collect(); - let reader = VecBatchReader::new(&batches); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader)) - // Still use the default batch size. - .batch_size(0) - .build() - .await - .unwrap(); - let ts1: Vec<_> = (0..MIN_BATCH_SIZE as i64).collect(); - let ts2: Vec<_> = (MIN_BATCH_SIZE as i64..MIN_BATCH_SIZE as i64 * 2).collect(); - let seqs = vec![10; MIN_BATCH_SIZE]; - let op_types = vec![OpType::Put; MIN_BATCH_SIZE]; - let fields = vec![100; MIN_BATCH_SIZE]; - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &ts1, &seqs, &op_types, &fields), - new_batch(b"k1", &ts2, &seqs, &op_types, &fields), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_more_than_batch_size_overlapping() { - let reader1 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - &[11, 10, 11, 10, 11, 10, 11, 10, 11], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[21, 22, 23, 24, 25, 26, 27, 28, 29], - )]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 2, 3, 4, 5, 6, 7, 8, 9], - &[10, 11, 10, 11, 10, 11, 10, 11, 10], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[31, 32, 33, 34, 35, 36, 37, 38, 39], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_iter(Box::new(reader1)) - .push_batch_reader(Box::new(reader2)) - .batch_size(3) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2, 3], - &[11, 11, 11], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 32, 23], - ), - new_batch( - b"k1", - &[4, 5, 6], - &[11, 11, 11], - &[OpType::Put, OpType::Put, OpType::Put], - &[34, 25, 36], - ), - new_batch( - b"k1", - &[7, 8, 9], - &[11, 11, 11], - &[OpType::Put, OpType::Put, OpType::Put], - &[27, 38, 29], - ), - ], - ) - .await; - } - - #[test] - fn test_batch_merger_empty() { - let mut merger = BatchMerger::new(); - assert!(merger.is_empty()); - assert!(merger.merge_batches().unwrap().is_none()); - assert!(merger.primary_key().is_none()); - } - - #[test] - fn test_merge_one_batch() { - let mut merger = BatchMerger::new(); - let expect = new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]); - merger.push(expect.clone()).unwrap(); - let batch = merger.merge_batches().unwrap().unwrap(); - assert_eq!(1, batch.num_rows()); - assert_eq!(expect, batch,); - assert!(merger.is_empty()); - } - - #[test] - fn test_merge_batches() { - let mut merger = BatchMerger::new(); - merger - .push(new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21])) - .unwrap(); - assert_eq!(1, merger.num_rows()); - assert!(!merger.is_empty()); - merger - .push(new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22])) - .unwrap(); - assert_eq!(2, merger.num_rows()); - merger - .push(new_batch(b"k1", &[3], &[10], &[OpType::Delete], &[23])) - .unwrap(); - assert_eq!(2, merger.num_rows()); - - let batch = merger.merge_batches().unwrap().unwrap(); - assert_eq!(2, batch.num_rows()); - assert_eq!( - batch, - new_batch( - b"k1", - &[1, 2], - &[10, 10], - &[OpType::Put, OpType::Put,], - &[21, 22] - ) - ); - assert!(merger.is_empty()); - assert_eq!(1, merger.num_deleted_rows()); + check_reader_result(&mut reader, &expect).await; } }