diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 32376aa802ed..cc39d8213f7d 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -100,6 +100,9 @@ pub struct MitoConfig { /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + /// Experimental merge tree memtable. + pub experimental_merge_tree: bool, + /// Inverted index configs. pub inverted_index: InvertedIndexConfig, } @@ -126,6 +129,7 @@ impl Default for MitoConfig { scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, + experimental_merge_tree: false, inverted_index: InvertedIndexConfig::default(), }; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 361ebf40b6c9..69e3335c28b4 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -464,7 +464,7 @@ impl FlushScheduler { } // Now we can flush the region directly. - version_control.freeze_mutable().inspect(|e| { + version_control.freeze_mutable().inspect_err(|e| { error!(e; "Failed to freeze the mutable memtable for region {}", region_id); })?; // Submit a flush job. diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index bdc52f56c951..73f9c9cf0e46 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -15,7 +15,7 @@ //! Memtables are write buffers for regions. pub mod key_values; -#[allow(unused)] +#[allow(dead_code)] pub mod merge_tree; pub mod time_series; pub(crate) mod version; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index bc7e8daa0293..5ac6d1a79004 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -26,7 +26,6 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::Arc; -use common_base::readable_size::ReadableSize; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -34,7 +33,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::merge_tree::mutable::WriteMetrics; -use crate::memtable::merge_tree::tree::{MergeTree, MergeTreeRef}; +use crate::memtable::merge_tree::tree::MergeTree; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -65,7 +64,7 @@ impl Default for MergeTreeConfig { Self { // TODO(yingwen): Use 4096 or find a proper value. index_max_keys_per_shard: 8192, - freeze_threshold: 4096, + freeze_threshold: 409600, } } } @@ -73,7 +72,7 @@ impl Default for MergeTreeConfig { /// Memtable based on a merge tree. pub struct MergeTreeMemtable { id: MemtableId, - tree: MergeTreeRef, + tree: MergeTree, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, @@ -190,7 +189,7 @@ impl MergeTreeMemtable { Self { id, - tree: Arc::new(tree), + tree, alloc_tracker, max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 3226aae12611..3acdf76e9166 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -25,7 +25,7 @@ use datatypes::arrow; use datatypes::arrow::array::{Array, RecordBatch, UInt16Array, UInt32Array}; use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; use datatypes::data_type::DataType; -use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, Vector, VectorRef}; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::ColumnSchema; use datatypes::types::TimestampType; use datatypes::vectors::{ @@ -209,19 +209,15 @@ impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { - while let Some(mut top) = self.heap.pop() { - if top.source.is_valid() { - let top_batch = top.source.current_batch(); - if let Err(e) = top.source.next() { - return Some(Err(e)); - } - if top.source.is_valid() { - self.heap.push(top); - } - return Some(Ok(top_batch)); - } + let mut top = self.heap.pop()?; + let top_batch = top.source.current_batch(); + if let Err(e) = top.source.next() { + return Some(Err(e)); } - None + if top.source.is_valid() { + self.heap.push(top); + } + Some(Ok(top_batch)) } } @@ -257,7 +253,7 @@ impl DataParts { } pub(crate) fn is_empty(&self) -> bool { - unimplemented!() + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) } } @@ -351,7 +347,7 @@ impl DataBuffer { pub fn iter(&mut self, pk_weights: &[u16]) -> Result { let batch = data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?; - Ok(DataBufferIter::new(batch)) + DataBufferIter::new(batch) } /// Returns num of rows in data buffer. @@ -373,14 +369,14 @@ pub(crate) struct DataBufferIter { } impl DataBufferIter { - pub(crate) fn new(batch: RecordBatch) -> Self { + pub(crate) fn new(batch: RecordBatch) -> Result { let mut iter = Self { batch, offset: 0, current_data_batch: None, }; - iter.next(); // fill data batch for comparison and merge. - iter + iter.next()?; // fill data batch for comparison and merge. + Ok(iter) } pub(crate) fn is_valid(&self) -> bool { @@ -650,6 +646,14 @@ pub enum DataPart { Parquet(Bytes), } +impl DataPart { + fn is_empty(&self) -> bool { + match self { + DataPart::Parquet(data) => data.is_empty(), + } + } +} + pub struct DataPartIter { inner: ParquetRecordBatchReader, current_range: Range, @@ -673,7 +677,7 @@ impl DataPartIter { if let Some(batch_size) = batch_size { builder = builder.with_batch_size(batch_size); } - let mut reader = builder.build().context(error::ReadDataPartSnafu)?; + let reader = builder.build().context(error::ReadDataPartSnafu)?; let mut iter = Self { inner: reader, current_pk_index: None, @@ -901,13 +905,13 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); + let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); let encoded = encoder.write(&mut buffer).unwrap(); let s = String::from_utf8_lossy(encoded.as_bytes()); assert!(s.starts_with("PAR1")); assert!(s.ends_with("PAR1")); - let mut builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); let mut reader = builder.build().unwrap(); let batch = reader.next().unwrap().unwrap(); assert_eq!(3, batch.num_rows()); @@ -975,7 +979,7 @@ mod tests { 3, ); - let mut encoder = DataPartEncoder::new(&meta, weights, Some(4)); + let encoder = DataPartEncoder::new(&meta, weights, Some(4)); let encoded = encoder.write(&mut buffer).unwrap(); let mut iter = DataPartIter::new(encoded, Some(4)).unwrap(); @@ -1094,7 +1098,7 @@ mod tests { active: buffer, frozen: vec![part_0, part_1], }; - let mut iter = parts.iter(pk_weights.to_vec()).unwrap(); + let iter = parts.iter(pk_weights.to_vec()).unwrap(); let mut res = Vec::with_capacity(expected_values.len()); for b in iter { let batch = b.unwrap().as_record_batch(); diff --git a/src/mito2/src/memtable/merge_tree/index.rs b/src/mito2/src/memtable/merge_tree/index.rs index 1fe0921e2aaf..9f2775fa3c8d 100644 --- a/src/mito2/src/memtable/merge_tree/index.rs +++ b/src/mito2/src/memtable/merge_tree/index.rs @@ -14,15 +14,12 @@ //! Primary key index of the merge tree. -use std::cmp::Ordering; -use std::collections::{BTreeMap, BinaryHeap}; +use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder}; -use datatypes::arrow::compute; -use snafu::ResultExt; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::Result; use crate::memtable::merge_tree::mutable::WriteMetrics; use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; diff --git a/src/mito2/src/memtable/merge_tree/mutable.rs b/src/mito2/src/memtable/merge_tree/mutable.rs index d7c04d784e4e..bacfe3548e43 100644 --- a/src/mito2/src/memtable/merge_tree/mutable.rs +++ b/src/mito2/src/memtable/merge_tree/mutable.rs @@ -214,19 +214,19 @@ impl Default for WriteMetrics { #[derive(Debug, Default)] pub(crate) struct ReadMetrics { /// Time used to initialize the iter. - init_cost: Duration, + pub(crate) init_cost: Duration, /// Time used to prune rows. - prune_cost: Duration, + pub(crate) prune_cost: Duration, /// Time used to sort and dedup rows. sort_dedup_cost: Duration, /// Time used to invoke next. - next_cost: Duration, + pub(crate) next_cost: Duration, /// Number of batches returned by the iter. - num_batches: usize, + pub(crate) num_batches: usize, /// Number of rows before prunning. num_rows_before_prune: usize, /// Number of rows returned. - num_rows_returned: usize, + pub(crate) num_rows_returned: usize, /// Failures during evaluating expressions. eval_failure_total: u32, } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index e7f0ea1f51d7..0bd09a58a31b 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -16,10 +16,10 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock}; +use std::time::Instant; use api::v1::OpType; use common_time::Timestamp; -use datatypes::arrow::record_batch::RecordBatch; use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -31,7 +31,7 @@ use crate::memtable::merge_tree::data::{self, DataBatch, DataParts}; use crate::memtable::merge_tree::index::{ compute_pk_weights, IndexConfig, IndexReader, KeyIndex, KeyIndexRef, ShardReader, }; -use crate::memtable::merge_tree::mutable::WriteMetrics; +use crate::memtable::merge_tree::mutable::{ReadMetrics, WriteMetrics}; use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex}; use crate::memtable::{BoxedBatchIterator, KeyValues}; use crate::read::{Batch, BatchBuilder}; @@ -51,8 +51,6 @@ pub(crate) struct MergeTree { pub(crate) parts: RwLock, } -pub(crate) type MergeTreeRef = Arc; - impl MergeTree { /// Creates a new merge tree. pub(crate) fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> MergeTree { @@ -139,6 +137,9 @@ impl MergeTree { projection: Option<&[ColumnId]>, predicate: Option, ) -> Result { + let mut metrics = ReadMetrics::default(); + let init_start = Instant::now(); + assert!(predicate.is_none(), "Predicate is unsupported"); // Creates the projection set. let projection: HashSet<_> = if let Some(projection) = projection { @@ -170,11 +171,13 @@ impl MergeTree { parts.data.iter(pk_weights)? }; + metrics.init_cost = init_start.elapsed(); let iter = ShardIter { metadata: self.metadata.clone(), projection, index_reader, data_reader: DataReader::new(data_iter)?, + metrics, }; Ok(Box::new(iter)) @@ -303,17 +306,22 @@ struct ShardIter { projection: HashSet, index_reader: Option, data_reader: DataReader, + metrics: ReadMetrics, } impl Iterator for ShardIter { type Item = Result; fn next(&mut self) -> Option { + let start = Instant::now(); if !self.data_reader.is_valid() { + self.metrics.next_cost += start.elapsed(); return None; } - self.next_batch().transpose() + let ret = self.next_batch().transpose(); + self.metrics.next_cost += start.elapsed(); + ret } } @@ -330,6 +338,8 @@ impl ShardIter { )?; // Advances the data reader. self.data_reader.next()?; + self.metrics.num_batches += 1; + self.metrics.num_rows_returned += batch.num_rows(); return Ok(Some(batch)); }; @@ -352,10 +362,18 @@ impl ShardIter { )?; // Advances the data reader. self.data_reader.next()?; + self.metrics.num_batches += 1; + self.metrics.num_rows_returned += batch.num_rows(); Ok(Some(batch)) } } +impl Drop for ShardIter { + fn drop(&mut self) { + common_telemetry::info!("Shard iter drop, metrics: {:?}", self.metrics); + } +} + struct DataReader { current: Option, iter: data::Iter, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3f960f416a73..b3f435c01f11 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -49,6 +49,7 @@ use crate::config::MitoConfig; use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::manifest::action::RegionEdit; +use crate::memtable::merge_tree::MergeTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::MemtableBuilderRef; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; @@ -321,6 +322,16 @@ impl WorkerStarter { fn start(self) -> RegionWorker { let regions = Arc::new(RegionMap::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); + let memtable_builder = if self.config.experimental_merge_tree { + info!("Use experimental merge tree memtable"); + Arc::new(MergeTreeMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))) as _ + } else { + Arc::new(TimeSeriesMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))) as _ + }; let running = Arc::new(AtomicBool::new(true)); let mut worker_thread = RegionWorkerLoop { @@ -333,9 +344,7 @@ impl WorkerStarter { wal: Wal::new(self.log_store), object_store_manager: self.object_store_manager.clone(), running: running.clone(), - memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( - self.write_buffer_manager.clone(), - ))), + memtable_builder, scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()),