diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index f7d05c621f62..6adc6eb96aec 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -110,6 +110,15 @@ impl MemtableStats { pub type BoxedBatchIterator = Box> + Send>; +/// Ranges in a memtable. +#[derive(Default)] +pub struct MemtableRanges { + /// Range IDs and ranges. + pub ranges: BTreeMap, + /// Statistics of the memtable at the query time. + pub stats: MemtableStats, +} + /// In memory write buffer. pub trait Memtable: Send + Sync + fmt::Debug { /// Returns the id of this memtable. @@ -139,7 +148,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> BTreeMap; + ) -> MemtableRanges; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 46e757f3df16..96e6c70acdf9 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -14,7 +14,6 @@ //! Memtable implementation for bulk load -use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; use store_api::metadata::RegionMetadataRef; @@ -25,7 +24,7 @@ use crate::error::Result; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRange, MemtableRef, MemtableStats, + BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; #[allow(unused)] @@ -68,7 +67,7 @@ impl Memtable for BulkMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - ) -> BTreeMap { + ) -> MemtableRanges { todo!() } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 4c4b471643bd..1376f923316c 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -23,7 +23,6 @@ mod shard; mod shard_builder; mod tree; -use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::sync::Arc; @@ -41,7 +40,7 @@ use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, - MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, + MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, }; use crate::region::options::MergeMode; @@ -176,7 +175,7 @@ impl Memtable for PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> BTreeMap { + ) -> MemtableRanges { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), @@ -185,7 +184,10 @@ impl Memtable for PartitionTreeMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); - [(0, MemtableRange::new(context))].into() + MemtableRanges { + ranges: [(0, MemtableRange::new(context))].into(), + stats: self.stats(), + } } fn is_empty(&self) -> bool { diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 4959c468b6db..8ef6f4412120 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -45,7 +45,7 @@ use crate::memtable::key_values::KeyValue; use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, - MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, + MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, }; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; @@ -250,7 +250,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> BTreeMap { + ) -> MemtableRanges { let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -268,7 +268,10 @@ impl Memtable for TimeSeriesMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); - [(0, MemtableRange::new(context))].into() + MemtableRanges { + ranges: [(0, MemtableRange::new(context))].into(), + stats: self.stats(), + } } fn is_empty(&self) -> bool { diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index bdad5f8fef0c..1b29e196a2fe 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -24,7 +24,7 @@ use store_api::region_engine::PartitionRange; use crate::cache::CacheManager; use crate::error::Result; -use crate::memtable::{MemtableRange, MemtableRef}; +use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats}; use crate::read::scan_region::ScanInput; use crate::sst::file::{overlaps, FileHandle, FileTimeRange}; use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef}; @@ -175,7 +175,7 @@ impl RangeMeta { } } - fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { + fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec) { // For append mode, we can parallelize reading memtables. for (memtable_index, memtable) in memtables.iter().enumerate() { let stats = memtable.stats(); @@ -270,7 +270,7 @@ impl RangeMeta { } } - fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { + fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec) { // For non append-only mode, each range only contains one memtable by default. for (i, memtable) in memtables.iter().enumerate() { let stats = memtable.stats(); @@ -421,29 +421,38 @@ impl FileRangeBuilder { /// Builder to create mem ranges. pub(crate) struct MemRangeBuilder { /// Ranges of a memtable. - row_groups: BTreeMap, + ranges: MemtableRanges, } impl MemRangeBuilder { /// Builds a mem range builder from row groups. - pub(crate) fn new(row_groups: BTreeMap) -> Self { - Self { row_groups } + pub(crate) fn new(ranges: MemtableRanges) -> Self { + Self { ranges } } /// Builds mem ranges to read in the memtable. /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) { + pub(crate) fn build_ranges( + &self, + row_group_index: i64, + ranges: &mut SmallVec<[MemtableRange; 2]>, + ) { if row_group_index >= 0 { let row_group_index = row_group_index as usize; // Scans one row group. - let Some(range) = self.row_groups.get(&row_group_index) else { + let Some(range) = self.ranges.ranges.get(&row_group_index) else { return; }; ranges.push(range.clone()); } else { - ranges.extend(self.row_groups.values().cloned()); + ranges.extend(self.ranges.ranges.values().cloned()); } } + + /// Returns the statistics of the memtable. + pub(crate) fn stats(&self) -> &MemtableStats { + &self.ranges.stats + } } /// List to manages the builders to create file ranges. @@ -451,18 +460,15 @@ impl MemRangeBuilder { /// the list to different streams in the same partition. pub(crate) struct RangeBuilderList { num_memtables: usize, - mem_builders: Mutex>>, file_builders: Mutex>>>, } impl RangeBuilderList { /// Creates a new [ReaderBuilderList] with the given number of memtables and files. pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self { - let mem_builders = (0..num_memtables).map(|_| None).collect(); let file_builders = (0..num_files).map(|_| None).collect(); Self { num_memtables, - mem_builders: Mutex::new(mem_builders), file_builders: Mutex::new(file_builders), } } @@ -488,26 +494,6 @@ impl RangeBuilderList { Ok(ranges) } - /// Builds mem ranges to read the row group at `index`. - pub(crate) fn build_mem_ranges( - &self, - input: &ScanInput, - index: RowGroupIndex, - ) -> SmallVec<[MemtableRange; 2]> { - let mut ranges = SmallVec::new(); - let mut mem_builders = self.mem_builders.lock().unwrap(); - match &mut mem_builders[index.index] { - Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), - None => { - let builder = input.prune_memtable(index.index); - builder.build_ranges(index.row_group_index, &mut ranges); - mem_builders[index.index] = Some(builder); - } - } - - ranges - } - fn get_file_builder(&self, index: usize) -> Option> { let file_builders = self.file_builders.lock().unwrap(); file_builders[index].clone() diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 32b8c90cda02..946ef2884132 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -24,6 +24,7 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; use datafusion_expr::utils::expr_to_columns; +use smallvec::SmallVec; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; @@ -35,7 +36,7 @@ use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; -use crate::memtable::MemtableRef; +use crate::memtable::MemtableRange; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch}; use crate::read::projection::ProjectionMapper; @@ -328,6 +329,14 @@ impl ScanRegion { Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, None => ProjectionMapper::all(&self.version.metadata)?, }; + // Get memtable ranges to scan. + let memtables = memtables + .into_iter() + .map(|mem| { + let ranges = mem.ranges(Some(mapper.column_ids()), Some(predicate.clone())); + MemRangeBuilder::new(ranges) + }) + .collect(); let input = ScanInput::new(self.access_layer, mapper) .with_time_range(Some(time_range)) @@ -484,8 +493,8 @@ pub(crate) struct ScanInput { time_range: Option, /// Predicate to push down. pub(crate) predicate: Option, - /// Memtables to scan. - pub(crate) memtables: Vec, + /// Memtable range builders for memtables in the time range.. + pub(crate) memtables: Vec, /// Handles to SST files to scan. pub(crate) files: Vec, /// Cache. @@ -547,9 +556,9 @@ impl ScanInput { self } - /// Sets memtables to read. + /// Sets memtable range builders. #[must_use] - pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { + pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } @@ -667,11 +676,12 @@ impl ScanInput { Ok(sources) } - /// Prunes a memtable to scan and returns the builder to build readers. - pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { - let memtable = &self.memtables[mem_index]; - let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone()); - MemRangeBuilder::new(row_groups) + /// Builds memtable ranges to scan by `index`. + pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> { + let memtable = &self.memtables[index.index]; + let mut ranges = SmallVec::new(); + memtable.build_ranges(index.row_group_index, &mut ranges); + ranges } /// Prunes a file to scan and returns the builder to build readers. diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 0bdf62e77e03..77a9bb161254 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -137,10 +137,9 @@ pub(crate) fn scan_mem_ranges( part_metrics: PartitionMetrics, index: RowGroupIndex, time_range: FileTimeRange, - range_builder_list: Arc, ) -> impl Stream> { try_stream! { - let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index); + let ranges = stream_ctx.input.build_mem_ranges(index); part_metrics.inc_num_mem_ranges(ranges.len()); for range in ranges { let build_reader_start = Instant::now(); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index bdf3a7d6b8bb..ca9291c0f6ed 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -403,7 +403,6 @@ fn build_sources( part_metrics.clone(), *index, range_meta.time_range, - range_builder_list.clone(), ); Box::pin(stream) as _ } else { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 60e5ca5c7cdb..28e7d64addd8 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -97,7 +97,6 @@ impl UnorderedScan { part_metrics.clone(), *index, range_meta.time_range, - range_builder_list.clone(), ); for await batch in stream { yield batch; diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index f1cc57aa3b51..1a0eacecf823 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -35,7 +35,7 @@ use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, - MemtableRef, MemtableStats, + MemtableRanges, MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -93,8 +93,8 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - ) -> BTreeMap { - BTreeMap::new() + ) -> MemtableRanges { + MemtableRanges::default() } fn is_empty(&self) -> bool {