diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index b82032bbc8d2..20ab1db69c8c 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -110,6 +110,13 @@ pub trait Memtable: Send + Sync + fmt::Debug { predicate: Option, ) -> Result; + /// Returns the ranges in the memtable. + fn ranges( + &self, + projection: Option<&[ColumnId]>, + predicate: Option, + ) -> Vec; + /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; @@ -278,6 +285,57 @@ impl MemtableBuilderProvider { } } +/// Builder to build an iterator to read the range. +/// The builder should know the projection and the predicate to build the iterator. +pub trait IterBuilder: Send + Sync { + /// Returns the iterator to read the range. + fn build(&self) -> Result; +} + +pub type BoxedIterBuilder = Box; + +/// Context shared by ranges of the same memtable. +pub struct MemtableRangeContext { + /// Id of the memtable. + id: MemtableId, + /// Iterator builder. + builder: BoxedIterBuilder, +} + +pub type MemtableRangeContextRef = Arc; + +impl MemtableRangeContext { + /// Creates a new [MemtableRangeContext]. + pub fn new(id: MemtableId, builder: BoxedIterBuilder) -> Self { + Self { id, builder } + } +} + +/// A range in the memtable. +#[derive(Clone)] +pub struct MemtableRange { + /// Shared context. + context: MemtableRangeContextRef, + // TODO(yingwen): Id to identify the range in the memtable. +} + +impl MemtableRange { + /// Creates a new range from context. + pub fn new(context: MemtableRangeContextRef) -> Self { + Self { context } + } + + /// Returns the id of the memtable to read. + pub fn id(&self) -> MemtableId { + self.context.id + } + + /// Builds an iterator to read the range. + pub fn build_iter(&self) -> Result { + self.context.builder.build() + } +} + #[cfg(test)] mod tests { use common_base::readable_size::ReadableSize; diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 280442968d32..541a34f70100 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -40,8 +40,8 @@ use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::metrics::WriteMetrics; use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, - MemtableRef, MemtableStats, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, + MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. @@ -105,7 +105,7 @@ impl Default for PartitionTreeConfig { /// Memtable based on a partition tree. pub struct PartitionTreeMemtable { id: MemtableId, - tree: PartitionTree, + tree: Arc, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, @@ -156,6 +156,22 @@ impl Memtable for PartitionTreeMemtable { self.tree.read(projection, predicate) } + fn ranges( + &self, + projection: Option<&[ColumnId]>, + predicate: Option, + ) -> Vec { + let projection = projection.map(|ids| ids.to_vec()); + let builder = Box::new(PartitionTreeIterBuilder { + tree: self.tree.clone(), + projection, + predicate, + }); + let context = Arc::new(MemtableRangeContext::new(self.id, builder)); + + vec![MemtableRange::new(context)] + } + fn is_empty(&self) -> bool { self.tree.is_empty() } @@ -224,7 +240,7 @@ impl PartitionTreeMemtable { Self { id, - tree, + tree: Arc::new(tree), alloc_tracker, max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), @@ -309,6 +325,19 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder { } } +struct PartitionTreeIterBuilder { + tree: Arc, + projection: Option>, + predicate: Option, +} + +impl IterBuilder for PartitionTreeIterBuilder { + fn build(&self) -> Result { + self.tree + .read(self.projection.as_deref(), self.predicate.clone()) + } +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 79bd74b9ef84..52ec7f60cbf3 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -40,8 +40,8 @@ use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismat use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, - MemtableRef, MemtableStats, + AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, + MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchBuilder, BatchColumn}; @@ -244,6 +244,30 @@ impl Memtable for TimeSeriesMemtable { Ok(Box::new(iter)) } + fn ranges( + &self, + projection: Option<&[ColumnId]>, + predicate: Option, + ) -> Vec { + let projection = if let Some(projection) = projection { + projection.iter().copied().collect() + } else { + self.region_metadata + .field_columns() + .map(|c| c.column_id) + .collect() + }; + let builder = Box::new(TimeSeriesIterBuilder { + series_set: self.series_set.clone(), + projection, + predicate, + dedup: self.dedup, + }); + let context = Arc::new(MemtableRangeContext::new(self.id, builder)); + + vec![MemtableRange::new(context)] + } + fn is_empty(&self) -> bool { self.series_set.series.read().unwrap().is_empty() } @@ -308,6 +332,7 @@ impl Default for LocalStats { type SeriesRwLockMap = RwLock, Arc>>>; +#[derive(Clone)] struct SeriesSet { region_metadata: RegionMetadataRef, series: Arc, @@ -816,6 +841,24 @@ impl From for Values { } } +struct TimeSeriesIterBuilder { + series_set: SeriesSet, + projection: HashSet, + predicate: Option, + dedup: bool, +} + +impl IterBuilder for TimeSeriesIterBuilder { + fn build(&self) -> Result { + let iter = self.series_set.iter_series( + self.projection.clone(), + self.predicate.clone(), + self.dedup, + )?; + Ok(Box::new(iter)) + } +} + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a185584d4358..d4200fcb739f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -35,7 +35,7 @@ use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; use crate::error::Result; -use crate::memtable::MemtableRef; +use crate::memtable::{MemtableRange, MemtableRef}; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch}; use crate::read::projection::ProjectionMapper; @@ -631,9 +631,8 @@ pub(crate) type FileRangesGroup = SmallVec<[Vec; 4]>; /// It contains memtables and file ranges to scan. #[derive(Default)] pub(crate) struct ScanPart { - /// Memtables to scan. - /// We scan the whole memtable now. We might scan a range of the memtable in the future. - pub(crate) memtables: Vec, + /// Memtable ranges to scan. + pub(crate) memtable_ranges: Vec, /// File ranges to scan. pub(crate) file_ranges: FileRangesGroup, /// Optional time range of the part (inclusive). @@ -644,8 +643,8 @@ impl fmt::Debug for ScanPart { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "ScanPart({} memtables, {} file ranges", - self.memtables.len(), + "ScanPart({} memtable ranges, {} file ranges", + self.memtable_ranges.len(), self.file_ranges .iter() .map(|ranges| ranges.len()) @@ -671,7 +670,7 @@ impl ScanPart { /// Merges given `part` to this part. pub(crate) fn merge(&mut self, mut part: ScanPart) { - self.memtables.append(&mut part.memtables); + self.memtable_ranges.append(&mut part.memtable_ranges); self.file_ranges.append(&mut part.file_ranges); let Some(part_range) = part.time_range else { return; @@ -688,7 +687,9 @@ impl ScanPart { /// Returns true if the we can split the part into multiple parts /// and preserving order. pub(crate) fn can_split_preserve_order(&self) -> bool { - self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1 + self.memtable_ranges.is_empty() + && self.file_ranges.len() == 1 + && self.file_ranges[0].len() > 1 } } @@ -739,10 +740,10 @@ impl ScanPartList { self.0.as_ref().map_or(0, |parts| parts.len()) } - /// Returns the number of memtables. - pub(crate) fn num_memtables(&self) -> usize { + /// Returns the number of memtable ranges. + pub(crate) fn num_mem_ranges(&self) -> usize { self.0.as_ref().map_or(0, |parts| { - parts.iter().map(|part| part.memtables.len()).sum() + parts.iter().map(|part| part.memtable_ranges.len()).sum() }) } @@ -792,9 +793,9 @@ impl StreamContext { Ok(inner) => match t { DisplayFormatType::Default => write!( f, - "partition_count={} ({} memtables, {} file ranges)", + "partition_count={} ({} memtable ranges, {} file ranges)", inner.len(), - inner.num_memtables(), + inner.num_mem_ranges(), inner.num_file_ranges() ), DisplayFormatType::Verbose => write!(f, "{:?}", &*inner), diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index f957f2b04c09..532d051fa2d3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -91,16 +91,11 @@ impl SeqScan { } /// Builds sources from a [ScanPart]. - fn build_part_sources( - part: &ScanPart, - projection: Option<&[ColumnId]>, - predicate: Option<&Predicate>, - sources: &mut Vec, - ) -> Result<()> { - sources.reserve(part.memtables.len() + part.file_ranges.len()); + fn build_part_sources(part: &ScanPart, sources: &mut Vec) -> Result<()> { + sources.reserve(part.memtable_ranges.len() + part.file_ranges.len()); // Read memtables. - for mem in &part.memtables { - let iter = mem.iter(projection, predicate.cloned())?; + for mem in &part.memtable_ranges { + let iter = mem.build_iter()?; sources.push(Source::Iter(iter)); } // Read files. @@ -154,28 +149,17 @@ impl SeqScan { let mut parts = stream_ctx.parts.lock().await; maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; - let input = &stream_ctx.input; let mut sources = Vec::new(); if let Some(index) = partition { let Some(part) = parts.get_part(index) else { return Ok(None); }; - Self::build_part_sources( - part, - Some(input.mapper.column_ids()), - input.predicate.as_ref(), - &mut sources, - )?; + Self::build_part_sources(part, &mut sources)?; } else { // Safety: We initialized parts before. for part in parts.0.as_ref().unwrap() { - Self::build_part_sources( - part, - Some(input.mapper.column_ids()), - input.predicate.as_ref(), - &mut sources, - )?; + Self::build_part_sources(part, &mut sources)?; } } @@ -308,8 +292,12 @@ async fn maybe_init_parts( let now = Instant::now(); let mut distributor = SeqDistributor::default(); input.prune_file_ranges(&mut distributor).await?; - part_list - .set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); + distributor.append_mem_ranges( + &input.memtables, + Some(input.mapper.column_ids()), + input.predicate.clone(), + ); + part_list.set_parts(distributor.build_parts(input.parallelism.parallelism)); metrics.observe_init_part(now.elapsed()); } @@ -335,7 +323,7 @@ impl FileRangeCollector for SeqDistributor { return; } let part = ScanPart { - memtables: Vec::new(), + memtable_ranges: Vec::new(), file_ranges: smallvec![ranges], time_range: Some(file_meta.time_range), }; @@ -344,22 +332,33 @@ impl FileRangeCollector for SeqDistributor { } impl SeqDistributor { - /// Groups file ranges and memtables by time ranges. - /// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1. - /// - /// Output parts have non-overlapping time ranges. - fn build_parts(mut self, memtables: &[MemtableRef], parallelism: usize) -> Vec { - // Creates a part for each memtable. + /// Appends memtable ranges to the distributor. + fn append_mem_ranges( + &mut self, + memtables: &[MemtableRef], + projection: Option<&[ColumnId]>, + predicate: Option, + ) { for mem in memtables { let stats = mem.stats(); + let mem_ranges = mem.ranges(projection, predicate.clone()); + if mem_ranges.is_empty() { + continue; + } let part = ScanPart { - memtables: vec![mem.clone()], + memtable_ranges: mem_ranges, file_ranges: smallvec![], time_range: stats.time_range(), }; self.parts.push(part); } + } + /// Groups file ranges and memtable ranges by time ranges. + /// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1. + /// + /// Output parts have non-overlapping time ranges. + fn build_parts(self, parallelism: usize) -> Vec { let parallelism = parallelism.max(1); let parts = group_parts_by_range(self.parts); let parts = maybe_split_parts(parts, parallelism); @@ -418,9 +417,9 @@ fn maybe_merge_parts(mut parts: Vec, parallelism: usize) -> Vec, parallelism: usize) -> Vec 0); for ranges in part.file_ranges[0].chunks(ranges_per_part) { let new_part = ScanPart { - memtables: Vec::new(), + memtable_ranges: Vec::new(), file_ranges: smallvec![ranges.to_vec()], time_range: part.time_range, }; @@ -505,14 +504,12 @@ fn maybe_split_parts(mut parts: Vec, parallelism: usize) -> Vec, i64, i64); @@ -525,9 +522,7 @@ mod tests { Timestamp::new(*end, TimeUnit::Second), ); ScanPart { - memtables: vec![Arc::new( - EmptyMemtable::new(*id).with_time_range(Some(range)), - )], + memtable_ranges: vec![mem_range_for_test(*id)], file_ranges: smallvec![], time_range: Some(range), } @@ -537,7 +532,7 @@ mod tests { let actual: Vec<_> = output .iter() .map(|part| { - let ids: Vec<_> = part.memtables.iter().map(|mem| mem.id()).collect(); + let ids: Vec<_> = part.memtable_ranges.iter().map(|mem| mem.id()).collect(); let range = part.time_range.unwrap(); (ids, range.0.value(), range.1.value()) }) diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index eccd8ec88c79..5950094b0d0d 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -29,10 +29,12 @@ use futures::StreamExt; use smallvec::smallvec; use snafu::ResultExt; use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; +use store_api::storage::ColumnId; +use table::predicate::Predicate; use crate::cache::CacheManager; use crate::error::Result; -use crate::memtable::MemtableRef; +use crate::memtable::{MemtableRange, MemtableRef}; use crate::read::compat::CompatBatch; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::{ @@ -151,13 +153,10 @@ impl RegionScanner for UnorderedScan { let mapper = &stream_ctx.input.mapper; let memtable_sources = part - .memtables + .memtable_ranges .iter() .map(|mem| { - let iter = mem.iter( - Some(mapper.column_ids()), - stream_ctx.input.predicate.clone(), - )?; + let iter = mem.build_iter()?; Ok(Source::Iter(iter)) }) .collect::>>() @@ -240,8 +239,12 @@ async fn maybe_init_parts( let now = Instant::now(); let mut distributor = UnorderedDistributor::default(); input.prune_file_ranges(&mut distributor).await?; - part_list - .set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); + distributor.append_mem_ranges( + &input.memtables, + Some(input.mapper.column_ids()), + input.predicate.clone(), + ); + part_list.set_parts(distributor.build_parts(input.parallelism.parallelism)); metrics.observe_init_part(now.elapsed()); } @@ -253,6 +256,7 @@ async fn maybe_init_parts( /// is no output ordering guarantee of each partition. #[derive(Default)] struct UnorderedDistributor { + mem_ranges: Vec, file_ranges: Vec, } @@ -267,35 +271,52 @@ impl FileRangeCollector for UnorderedDistributor { } impl UnorderedDistributor { + /// Appends memtable ranges to the distributor. + fn append_mem_ranges( + &mut self, + memtables: &[MemtableRef], + projection: Option<&[ColumnId]>, + predicate: Option, + ) { + for mem in memtables { + let mut mem_ranges = mem.ranges(projection, predicate.clone()); + if mem_ranges.is_empty() { + continue; + } + self.mem_ranges.append(&mut mem_ranges); + } + } + /// Distributes file ranges and memtables across partitions according to the `parallelism`. /// The output number of parts may be `<= parallelism`. /// /// [ScanPart] created by this distributor only contains one group of file ranges. - fn build_parts(self, memtables: &[MemtableRef], parallelism: usize) -> Vec { + fn build_parts(self, parallelism: usize) -> Vec { if parallelism <= 1 { // Returns a single part. let part = ScanPart { - memtables: memtables.to_vec(), + memtable_ranges: self.mem_ranges.clone(), file_ranges: smallvec![self.file_ranges], time_range: None, }; return vec![part]; } - let mems_per_part = ((memtables.len() + parallelism - 1) / parallelism).max(1); + let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1); let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1); common_telemetry::debug!( - "Parallel scan is enabled, parallelism: {}, {} memtables, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", + "Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", parallelism, - memtables.len(), + self.mem_ranges.len(), self.file_ranges.len(), mems_per_part, ranges_per_part ); - let mut scan_parts = memtables + let mut scan_parts = self + .mem_ranges .chunks(mems_per_part) .map(|mems| ScanPart { - memtables: mems.to_vec(), + memtable_ranges: mems.to_vec(), file_ranges: smallvec![Vec::new()], // Ensures there is always one group. time_range: None, }) @@ -303,7 +324,7 @@ impl UnorderedDistributor { for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() { if i == scan_parts.len() { scan_parts.push(ScanPart { - memtables: Vec::new(), + memtable_ranges: Vec::new(), file_ranges: smallvec![ranges.to_vec()], time_range: None, }); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 30160e7562b3..1a9951fbb5ae 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -23,7 +23,7 @@ use crate::sst::file::FileTimeRange; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; pub(crate) mod file_range; -mod format; +pub(crate) mod format; pub(crate) mod helper; pub(crate) mod metadata; mod page_reader; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 7723a996c062..a5fc417b1b00 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -82,16 +82,10 @@ impl FileRange { /// Context shared by ranges of the same parquet SST. pub(crate) struct FileRangeContext { - // Row group reader builder for the file. + /// Row group reader builder for the file. reader_builder: RowGroupReaderBuilder, - /// Filters pushed down. - filters: Vec, - /// Helper to read the SST. - read_format: ReadFormat, - /// Decoder for primary keys - codec: McmpRowCodec, - /// Optional helper to compat batches. - compat_batch: Option, + /// Base of the context. + base: RangeBase, } pub(crate) type FileRangeContextRef = Arc; @@ -106,10 +100,12 @@ impl FileRangeContext { ) -> Self { Self { reader_builder, - filters, - read_format, - codec, - compat_batch: None, + base: RangeBase { + filters, + read_format, + codec, + compat_batch: None, + }, } } @@ -120,12 +116,12 @@ impl FileRangeContext { /// Returns filters pushed down. pub(crate) fn filters(&self) -> &[SimpleFilterContext] { - &self.filters + &self.base.filters } /// Returns the format helper. pub(crate) fn read_format(&self) -> &ReadFormat { - &self.read_format + &self.base.read_format } /// Returns the reader builder. @@ -135,14 +131,34 @@ impl FileRangeContext { /// Returns the helper to compat batches. pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { - self.compat_batch.as_ref() + self.base.compat_batch.as_ref() } /// Sets the `CompatBatch` to the context. pub(crate) fn set_compat_batch(&mut self, compat: Option) { - self.compat_batch = compat; + self.base.compat_batch = compat; + } + + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. + /// Return the filtered batch. If the entire batch is filtered out, return None. + pub(crate) fn precise_filter(&self, input: Batch) -> Result> { + self.base.precise_filter(input) } +} + +/// Common fields for a range to read and filter batches. +pub(crate) struct RangeBase { + /// Filters pushed down. + pub(crate) filters: Vec, + /// Helper to read the SST. + pub(crate) read_format: ReadFormat, + /// Decoder for primary keys + pub(crate) codec: McmpRowCodec, + /// Optional helper to compat batches. + pub(crate) compat_batch: Option, +} +impl RangeBase { /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. /// diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index b3d1898c5bc6..69a9398975ae 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -33,14 +33,14 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, - MemtableStats, + BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, + MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Empty memtable for test. #[derive(Debug, Default)] -pub(crate) struct EmptyMemtable { +pub struct EmptyMemtable { /// Id of this memtable. id: MemtableId, /// Time range to return. @@ -49,7 +49,7 @@ pub(crate) struct EmptyMemtable { impl EmptyMemtable { /// Returns a new memtable with specific `id`. - pub(crate) fn new(id: MemtableId) -> EmptyMemtable { + pub fn new(id: MemtableId) -> EmptyMemtable { EmptyMemtable { id, time_range: None, @@ -57,10 +57,7 @@ impl EmptyMemtable { } /// Attaches the time range to the memtable. - pub(crate) fn with_time_range( - mut self, - time_range: Option<(Timestamp, Timestamp)>, - ) -> EmptyMemtable { + pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> EmptyMemtable { self.time_range = time_range; self } @@ -87,6 +84,14 @@ impl Memtable for EmptyMemtable { Ok(Box::new(std::iter::empty())) } + fn ranges( + &self, + _projection: Option<&[ColumnId]>, + _predicate: Option, + ) -> Vec { + vec![] + } + fn is_empty(&self) -> bool { true } @@ -114,6 +119,16 @@ impl MemtableBuilder for EmptyMemtableBuilder { } } +/// Empty iterator builder. +#[derive(Default)] +pub(crate) struct EmptyIterBuilder {} + +impl IterBuilder for EmptyIterBuilder { + fn build(&self) -> Result { + Ok(Box::new(std::iter::empty())) + } +} + /// Creates a region metadata to test memtable with default pk. /// /// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`. @@ -341,3 +356,11 @@ pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec { .map(|v| v.unwrap().0.value()) .collect() } + +/// Builds a memtable range for test. +pub(crate) fn mem_range_for_test(id: MemtableId) -> MemtableRange { + let builder = Box::new(EmptyIterBuilder::default()); + + let context = Arc::new(MemtableRangeContext::new(id, builder)); + MemtableRange::new(context) +} diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index f338e06d7c8b..267f60b10834 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -50,10 +50,11 @@ macro_rules! return_none_if_utf8 { }; } +/// Reference-counted pointer to a list of logical exprs. #[derive(Debug, Clone)] pub struct Predicate { /// logical exprs - exprs: Vec, + exprs: Arc>, } impl Predicate { @@ -61,7 +62,9 @@ impl Predicate { /// evaluated against record batches. /// Returns error when failed to convert exprs. pub fn new(exprs: Vec) -> Self { - Self { exprs } + Self { + exprs: Arc::new(exprs), + } } /// Returns the logical exprs. diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 26f55d9a2470..b96883df984f 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -35,7 +35,7 @@ explain analyze SELECT count(*) FROM system_metrics; |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 1_| +-+-+-+ diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 59b12671aea8..af9a9824f986 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -74,7 +74,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +| 1_| 0_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 10_| +-+-+-+ diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index 8fb7eb2144f0..091ca74c49bf 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -30,7 +30,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -59,7 +59,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -87,7 +87,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -117,7 +117,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+