Skip to content

Commit

Permalink
feat: Implement memtable range (#4162)
Browse files Browse the repository at this point in the history
* refactor: RangeBase

* feat: memtable range

* feat: scanner use mem range

* feat: remove base from mem range context

* feat: impl ranges for memtables

* chore: fix warnings

* refactor: make predicate cheap to clone

* refactor: MemRange -> MemtableRange

* feat: pub empty memtable to fix warnings

* test: fix sqlness result
  • Loading branch information
evenyag authored Jun 18, 2024
1 parent cd9705c commit fe74efd
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 111 deletions.
58 changes: 58 additions & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ pub trait Memtable: Send + Sync + fmt::Debug {
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator>;

/// Returns the ranges in the memtable.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange>;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;

Expand Down Expand Up @@ -278,6 +285,57 @@ impl MemtableBuilderProvider {
}
}

/// Builder to build an iterator to read the range.
/// The builder should know the projection and the predicate to build the iterator.
pub trait IterBuilder: Send + Sync {
/// Returns the iterator to read the range.
fn build(&self) -> Result<BoxedBatchIterator>;
}

pub type BoxedIterBuilder = Box<dyn IterBuilder>;

/// Context shared by ranges of the same memtable.
pub struct MemtableRangeContext {
/// Id of the memtable.
id: MemtableId,
/// Iterator builder.
builder: BoxedIterBuilder,
}

pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;

impl MemtableRangeContext {
/// Creates a new [MemtableRangeContext].
pub fn new(id: MemtableId, builder: BoxedIterBuilder) -> Self {
Self { id, builder }
}
}

/// A range in the memtable.
#[derive(Clone)]
pub struct MemtableRange {
/// Shared context.
context: MemtableRangeContextRef,
// TODO(yingwen): Id to identify the range in the memtable.
}

impl MemtableRange {
/// Creates a new range from context.
pub fn new(context: MemtableRangeContextRef) -> Self {
Self { context }
}

/// Returns the id of the memtable to read.
pub fn id(&self) -> MemtableId {
self.context.id
}

/// Builds an iterator to read the range.
pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
self.context.builder.build()
}
}

#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
Expand Down
37 changes: 33 additions & 4 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};

/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Default for PartitionTreeConfig {
/// Memtable based on a partition tree.
pub struct PartitionTreeMemtable {
id: MemtableId,
tree: PartitionTree,
tree: Arc<PartitionTree>,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
Expand Down Expand Up @@ -156,6 +156,22 @@ impl Memtable for PartitionTreeMemtable {
self.tree.read(projection, predicate)
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
projection,
predicate,
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
}

fn is_empty(&self) -> bool {
self.tree.is_empty()
}
Expand Down Expand Up @@ -224,7 +240,7 @@ impl PartitionTreeMemtable {

Self {
id,
tree,
tree: Arc::new(tree),
alloc_tracker,
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
Expand Down Expand Up @@ -309,6 +325,19 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder {
}
}

struct PartitionTreeIterBuilder {
tree: Arc<PartitionTree>,
projection: Option<Vec<ColumnId>>,
predicate: Option<Predicate>,
}

impl IterBuilder for PartitionTreeIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
self.tree
.read(self.projection.as_deref(), self.predicate.clone())
}
}

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
Expand Down
47 changes: 45 additions & 2 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismat
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchBuilder, BatchColumn};
Expand Down Expand Up @@ -244,6 +244,30 @@ impl Memtable for TimeSeriesMemtable {
Ok(Box::new(iter))
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
self.region_metadata
.field_columns()
.map(|c| c.column_id)
.collect()
};
let builder = Box::new(TimeSeriesIterBuilder {
series_set: self.series_set.clone(),
projection,
predicate,
dedup: self.dedup,
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
}

fn is_empty(&self) -> bool {
self.series_set.series.read().unwrap().is_empty()
}
Expand Down Expand Up @@ -308,6 +332,7 @@ impl Default for LocalStats {

type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;

#[derive(Clone)]
struct SeriesSet {
region_metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
Expand Down Expand Up @@ -816,6 +841,24 @@ impl From<ValueBuilder> for Values {
}
}

struct TimeSeriesIterBuilder {
series_set: SeriesSet,
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
dedup: bool,
}

impl IterBuilder for TimeSeriesIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
let iter = self.series_set.iter_series(
self.projection.clone(),
self.predicate.clone(),
self.dedup,
)?;
Ok(Box::new(iter))
}
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
27 changes: 14 additions & 13 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
Expand Down Expand Up @@ -631,9 +631,8 @@ pub(crate) type FileRangesGroup = SmallVec<[Vec<FileRange>; 4]>;
/// It contains memtables and file ranges to scan.
#[derive(Default)]
pub(crate) struct ScanPart {
/// Memtables to scan.
/// We scan the whole memtable now. We might scan a range of the memtable in the future.
pub(crate) memtables: Vec<MemtableRef>,
/// Memtable ranges to scan.
pub(crate) memtable_ranges: Vec<MemtableRange>,
/// File ranges to scan.
pub(crate) file_ranges: FileRangesGroup,
/// Optional time range of the part (inclusive).
Expand All @@ -644,8 +643,8 @@ impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} memtables, {} file ranges",
self.memtables.len(),
"ScanPart({} memtable ranges, {} file ranges",
self.memtable_ranges.len(),
self.file_ranges
.iter()
.map(|ranges| ranges.len())
Expand All @@ -671,7 +670,7 @@ impl ScanPart {

/// Merges given `part` to this part.
pub(crate) fn merge(&mut self, mut part: ScanPart) {
self.memtables.append(&mut part.memtables);
self.memtable_ranges.append(&mut part.memtable_ranges);
self.file_ranges.append(&mut part.file_ranges);
let Some(part_range) = part.time_range else {
return;
Expand All @@ -688,7 +687,9 @@ impl ScanPart {
/// Returns true if the we can split the part into multiple parts
/// and preserving order.
pub(crate) fn can_split_preserve_order(&self) -> bool {
self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1
self.memtable_ranges.is_empty()
&& self.file_ranges.len() == 1
&& self.file_ranges[0].len() > 1
}
}

Expand Down Expand Up @@ -739,10 +740,10 @@ impl ScanPartList {
self.0.as_ref().map_or(0, |parts| parts.len())
}

/// Returns the number of memtables.
pub(crate) fn num_memtables(&self) -> usize {
/// Returns the number of memtable ranges.
pub(crate) fn num_mem_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.memtables.len()).sum()
parts.iter().map(|part| part.memtable_ranges.len()).sum()
})
}

Expand Down Expand Up @@ -792,9 +793,9 @@ impl StreamContext {
Ok(inner) => match t {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtables, {} file ranges)",
"partition_count={} ({} memtable ranges, {} file ranges)",
inner.len(),
inner.num_memtables(),
inner.num_mem_ranges(),
inner.num_file_ranges()
),
DisplayFormatType::Verbose => write!(f, "{:?}", &*inner),
Expand Down
Loading

0 comments on commit fe74efd

Please sign in to comment.