From 61448480442ec1a8de38d9f6e89161d7c2fe797e Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 17:37:33 +0800 Subject: [PATCH 01/13] fix: error log --- src/mito2/src/flush.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 33551099df39..8385a57e94dc 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -459,7 +459,7 @@ impl FlushScheduler { } // Now we can flush the region directly. - version_control.freeze_mutable().inspect(|e| { + version_control.freeze_mutable().inspect_err(|e| { error!(e; "Failed to freeze the mutable memtable for region {}", region_id); })?; // Submit a flush job. From d94d32ba2cc79f13fb59ef1a2f72f74e56139c8d Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 17:45:44 +0800 Subject: [PATCH 02/13] feat: add config for merge tree --- src/mito2/src/config.rs | 4 ++++ src/mito2/src/worker.rs | 15 ++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index cc25be263bf6..f62550e8fc6b 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -100,6 +100,9 @@ pub struct MitoConfig { /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + /// Experimental merge tree memtable. + pub experimental_merge_tree: bool, + /// Inverted index configs. pub inverted_index: InvertedIndexConfig, } @@ -126,6 +129,7 @@ impl Default for MitoConfig { scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, + experimental_merge_tree: false, inverted_index: InvertedIndexConfig::default(), }; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3f960f416a73..b3f435c01f11 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -49,6 +49,7 @@ use crate::config::MitoConfig; use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::manifest::action::RegionEdit; +use crate::memtable::merge_tree::MergeTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::MemtableBuilderRef; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; @@ -321,6 +322,16 @@ impl WorkerStarter { fn start(self) -> RegionWorker { let regions = Arc::new(RegionMap::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); + let memtable_builder = if self.config.experimental_merge_tree { + info!("Use experimental merge tree memtable"); + Arc::new(MergeTreeMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))) as _ + } else { + Arc::new(TimeSeriesMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))) as _ + }; let running = Arc::new(AtomicBool::new(true)); let mut worker_thread = RegionWorkerLoop { @@ -333,9 +344,7 @@ impl WorkerStarter { wal: Wal::new(self.log_store), object_store_manager: self.object_store_manager.clone(), running: running.clone(), - memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( - self.write_buffer_manager.clone(), - ))), + memtable_builder, scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), From cc29b7ab10934dfacfc20288f92c38ca4487f9de Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 18:00:00 +0800 Subject: [PATCH 03/13] fix: is empty --- src/mito2/src/memtable/merge_tree/data.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 3226aae12611..2196101fb562 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -257,7 +257,7 @@ impl DataParts { } pub(crate) fn is_empty(&self) -> bool { - unimplemented!() + self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty()) } } @@ -650,6 +650,14 @@ pub enum DataPart { Parquet(Bytes), } +impl DataPart { + fn is_empty(&self) -> bool { + match self { + DataPart::Parquet(data) => data.is_empty(), + } + } +} + pub struct DataPartIter { inner: ParquetRecordBatchReader, current_range: Range, From 481185617417c2a19023edfe627f26e5ec25fffe Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 20:14:42 +0800 Subject: [PATCH 04/13] feat: metrics --- src/mito2/src/memtable/merge_tree/mutable.rs | 10 ++++---- src/mito2/src/memtable/merge_tree/tree.rs | 25 ++++++++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/mutable.rs b/src/mito2/src/memtable/merge_tree/mutable.rs index d7c04d784e4e..bacfe3548e43 100644 --- a/src/mito2/src/memtable/merge_tree/mutable.rs +++ b/src/mito2/src/memtable/merge_tree/mutable.rs @@ -214,19 +214,19 @@ impl Default for WriteMetrics { #[derive(Debug, Default)] pub(crate) struct ReadMetrics { /// Time used to initialize the iter. - init_cost: Duration, + pub(crate) init_cost: Duration, /// Time used to prune rows. - prune_cost: Duration, + pub(crate) prune_cost: Duration, /// Time used to sort and dedup rows. sort_dedup_cost: Duration, /// Time used to invoke next. - next_cost: Duration, + pub(crate) next_cost: Duration, /// Number of batches returned by the iter. - num_batches: usize, + pub(crate) num_batches: usize, /// Number of rows before prunning. num_rows_before_prune: usize, /// Number of rows returned. - num_rows_returned: usize, + pub(crate) num_rows_returned: usize, /// Failures during evaluating expressions. eval_failure_total: u32, } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index e7f0ea1f51d7..d9c052a810ae 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock}; +use std::time::Instant; use api::v1::OpType; use common_time::Timestamp; @@ -31,7 +32,7 @@ use crate::memtable::merge_tree::data::{self, DataBatch, DataParts}; use crate::memtable::merge_tree::index::{ compute_pk_weights, IndexConfig, IndexReader, KeyIndex, KeyIndexRef, ShardReader, }; -use crate::memtable::merge_tree::mutable::WriteMetrics; +use crate::memtable::merge_tree::mutable::{ReadMetrics, WriteMetrics}; use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex}; use crate::memtable::{BoxedBatchIterator, KeyValues}; use crate::read::{Batch, BatchBuilder}; @@ -139,6 +140,9 @@ impl MergeTree { projection: Option<&[ColumnId]>, predicate: Option, ) -> Result { + let mut metrics = ReadMetrics::default(); + let init_start = Instant::now(); + assert!(predicate.is_none(), "Predicate is unsupported"); // Creates the projection set. let projection: HashSet<_> = if let Some(projection) = projection { @@ -170,11 +174,13 @@ impl MergeTree { parts.data.iter(pk_weights)? }; + metrics.init_cost = init_start.elapsed(); let iter = ShardIter { metadata: self.metadata.clone(), projection, index_reader, data_reader: DataReader::new(data_iter)?, + metrics, }; Ok(Box::new(iter)) @@ -303,17 +309,22 @@ struct ShardIter { projection: HashSet, index_reader: Option, data_reader: DataReader, + metrics: ReadMetrics, } impl Iterator for ShardIter { type Item = Result; fn next(&mut self) -> Option { + let start = Instant::now(); if !self.data_reader.is_valid() { + self.metrics.next_cost += start.elapsed(); return None; } - self.next_batch().transpose() + let ret = self.next_batch().transpose(); + self.metrics.next_cost += start.elapsed(); + ret } } @@ -330,6 +341,8 @@ impl ShardIter { )?; // Advances the data reader. self.data_reader.next()?; + self.metrics.num_batches += 1; + self.metrics.num_rows_returned += batch.num_rows(); return Ok(Some(batch)); }; @@ -352,10 +365,18 @@ impl ShardIter { )?; // Advances the data reader. self.data_reader.next()?; + self.metrics.num_batches += 1; + self.metrics.num_rows_returned += batch.num_rows(); Ok(Some(batch)) } } +impl Drop for ShardIter { + fn drop(&mut self) { + common_telemetry::info!("Shard iter drop, metrics: {:?}", self.metrics); + } +} + struct DataReader { current: Option, iter: data::Iter, From c573b9c62f8887bcdcc7e2937947a10fa6f8c504 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 20:26:33 +0800 Subject: [PATCH 05/13] feat: larger freeze threshold --- src/mito2/src/memtable/merge_tree.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index bc7e8daa0293..a9b9fc95d08d 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -65,7 +65,7 @@ impl Default for MergeTreeConfig { Self { // TODO(yingwen): Use 4096 or find a proper value. index_max_keys_per_shard: 8192, - freeze_threshold: 4096, + freeze_threshold: 40960, } } } From a17f86266347acbe2a34c8956a78796f673c1c78 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 20:37:37 +0800 Subject: [PATCH 06/13] feat: logs --- src/mito2/src/memtable/merge_tree/data.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 2196101fb562..01af7803bf5c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -515,6 +515,8 @@ fn data_buffer_to_record_batches( pk_weights: &[u16], keep_data: bool, ) -> Result { + let start = std::time::Instant::now(); + let num_rows = buffer.ts_builder.len(); let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data { @@ -584,6 +586,8 @@ fn data_buffer_to_record_batches( ); } + common_telemetry::info!("data buffer to rb cost: {:?}", start.elapsed()); + RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) } From 759a3d2bddaae67a8c150019b8a29eef32ebaded Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 16 Feb 2024 20:46:28 +0800 Subject: [PATCH 07/13] feat: larger threshold --- src/mito2/src/memtable/merge_tree.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index a9b9fc95d08d..510ffae9055c 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -65,7 +65,7 @@ impl Default for MergeTreeConfig { Self { // TODO(yingwen): Use 4096 or find a proper value. index_max_keys_per_shard: 8192, - freeze_threshold: 40960, + freeze_threshold: 409600, } } } From e390b301972a7569f05d5776edbaeb3970f11457 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 10:49:19 +0800 Subject: [PATCH 08/13] chore: update comment --- src/mito2/src/memtable/merge_tree.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 510ffae9055c..8995b175e9ca 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -73,6 +73,7 @@ impl Default for MergeTreeConfig { /// Memtable based on a merge tree. pub struct MergeTreeMemtable { id: MemtableId, + // FIXME(yingwen): No need to use Arc. tree: MergeTreeRef, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, From a3bac068b5c22c0ccb6ee2d3ee39bd3279ed1b49 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 12:20:43 +0800 Subject: [PATCH 09/13] chore: fix unused --- src/mito2/src/memtable.rs | 2 +- src/mito2/src/memtable/merge_tree.rs | 1 - src/mito2/src/memtable/merge_tree/data.rs | 20 ++++++++++---------- src/mito2/src/memtable/merge_tree/index.rs | 7 ++----- src/mito2/src/memtable/merge_tree/tree.rs | 1 - 5 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index bdc52f56c951..73f9c9cf0e46 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -15,7 +15,7 @@ //! Memtables are write buffers for regions. pub mod key_values; -#[allow(unused)] +#[allow(dead_code)] pub mod merge_tree; pub mod time_series; pub(crate) mod version; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 8995b175e9ca..a708f2074ca6 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -26,7 +26,6 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::Arc; -use common_base::readable_size::ReadableSize; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 01af7803bf5c..6c75e61ab88d 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -25,7 +25,7 @@ use datatypes::arrow; use datatypes::arrow::array::{Array, RecordBatch, UInt16Array, UInt32Array}; use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; use datatypes::data_type::DataType; -use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, Vector, VectorRef}; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::ColumnSchema; use datatypes::types::TimestampType; use datatypes::vectors::{ @@ -351,7 +351,7 @@ impl DataBuffer { pub fn iter(&mut self, pk_weights: &[u16]) -> Result { let batch = data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?; - Ok(DataBufferIter::new(batch)) + DataBufferIter::new(batch) } /// Returns num of rows in data buffer. @@ -373,14 +373,14 @@ pub(crate) struct DataBufferIter { } impl DataBufferIter { - pub(crate) fn new(batch: RecordBatch) -> Self { + pub(crate) fn new(batch: RecordBatch) -> Result { let mut iter = Self { batch, offset: 0, current_data_batch: None, }; - iter.next(); // fill data batch for comparison and merge. - iter + iter.next()?; // fill data batch for comparison and merge. + Ok(iter) } pub(crate) fn is_valid(&self) -> bool { @@ -685,7 +685,7 @@ impl DataPartIter { if let Some(batch_size) = batch_size { builder = builder.with_batch_size(batch_size); } - let mut reader = builder.build().context(error::ReadDataPartSnafu)?; + let reader = builder.build().context(error::ReadDataPartSnafu)?; let mut iter = Self { inner: reader, current_pk_index: None, @@ -913,13 +913,13 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); + let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); let encoded = encoder.write(&mut buffer).unwrap(); let s = String::from_utf8_lossy(encoded.as_bytes()); assert!(s.starts_with("PAR1")); assert!(s.ends_with("PAR1")); - let mut builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); let mut reader = builder.build().unwrap(); let batch = reader.next().unwrap().unwrap(); assert_eq!(3, batch.num_rows()); @@ -987,7 +987,7 @@ mod tests { 3, ); - let mut encoder = DataPartEncoder::new(&meta, weights, Some(4)); + let encoder = DataPartEncoder::new(&meta, weights, Some(4)); let encoded = encoder.write(&mut buffer).unwrap(); let mut iter = DataPartIter::new(encoded, Some(4)).unwrap(); @@ -1106,7 +1106,7 @@ mod tests { active: buffer, frozen: vec![part_0, part_1], }; - let mut iter = parts.iter(pk_weights.to_vec()).unwrap(); + let iter = parts.iter(pk_weights.to_vec()).unwrap(); let mut res = Vec::with_capacity(expected_values.len()); for b in iter { let batch = b.unwrap().as_record_batch(); diff --git a/src/mito2/src/memtable/merge_tree/index.rs b/src/mito2/src/memtable/merge_tree/index.rs index 1fe0921e2aaf..9f2775fa3c8d 100644 --- a/src/mito2/src/memtable/merge_tree/index.rs +++ b/src/mito2/src/memtable/merge_tree/index.rs @@ -14,15 +14,12 @@ //! Primary key index of the merge tree. -use std::cmp::Ordering; -use std::collections::{BTreeMap, BinaryHeap}; +use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder}; -use datatypes::arrow::compute; -use snafu::ResultExt; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::Result; use crate::memtable::merge_tree::mutable::WriteMetrics; use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index d9c052a810ae..2e17acb7763e 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -20,7 +20,6 @@ use std::time::Instant; use api::v1::OpType; use common_time::Timestamp; -use datatypes::arrow::record_batch::RecordBatch; use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; From c90ce248decacc8db1850dd05e67d5a5dba9b23e Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 20:27:01 +0800 Subject: [PATCH 10/13] feat: remove log --- src/mito2/src/memtable/merge_tree/data.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 6c75e61ab88d..9abc4572f810 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -515,8 +515,6 @@ fn data_buffer_to_record_batches( pk_weights: &[u16], keep_data: bool, ) -> Result { - let start = std::time::Instant::now(); - let num_rows = buffer.ts_builder.len(); let (pk_index_v, ts_v, sequence_v, op_type_v) = if keep_data { @@ -586,8 +584,6 @@ fn data_buffer_to_record_batches( ); } - common_telemetry::info!("data buffer to rb cost: {:?}", start.elapsed()); - RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu) } From 7d83cde79d6e88fbc9c6f36eb2ef15896e14b508 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 20:29:12 +0800 Subject: [PATCH 11/13] chore: remove unnecessary check --- src/mito2/src/memtable/merge_tree/data.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 9abc4572f810..eb6b064dabc5 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -210,16 +210,14 @@ impl Iterator for Iter { fn next(&mut self) -> Option { while let Some(mut top) = self.heap.pop() { + let top_batch = top.source.current_batch(); + if let Err(e) = top.source.next() { + return Some(Err(e)); + } if top.source.is_valid() { - let top_batch = top.source.current_batch(); - if let Err(e) = top.source.next() { - return Some(Err(e)); - } - if top.source.is_valid() { - self.heap.push(top); - } - return Some(Ok(top_batch)); + self.heap.push(top); } + return Some(Ok(top_batch)); } None } From 169c6552fad7f275e243a706ccb28edeea20387d Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 20:30:26 +0800 Subject: [PATCH 12/13] feat: remove unnecessary arc --- src/mito2/src/memtable/merge_tree.rs | 7 +++---- src/mito2/src/memtable/merge_tree/tree.rs | 2 -- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index a708f2074ca6..5ac6d1a79004 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -33,7 +33,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::merge_tree::mutable::WriteMetrics; -use crate::memtable::merge_tree::tree::{MergeTree, MergeTreeRef}; +use crate::memtable::merge_tree::tree::MergeTree; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -72,8 +72,7 @@ impl Default for MergeTreeConfig { /// Memtable based on a merge tree. pub struct MergeTreeMemtable { id: MemtableId, - // FIXME(yingwen): No need to use Arc. - tree: MergeTreeRef, + tree: MergeTree, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, @@ -190,7 +189,7 @@ impl MergeTreeMemtable { Self { id, - tree: Arc::new(tree), + tree, alloc_tracker, max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 2e17acb7763e..0bd09a58a31b 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -51,8 +51,6 @@ pub(crate) struct MergeTree { pub(crate) parts: RwLock, } -pub(crate) type MergeTreeRef = Arc; - impl MergeTree { /// Creates a new merge tree. pub(crate) fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> MergeTree { From 82416f26c6aaa26ea62e5f9a8d5b966c130a7a69 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 18 Feb 2024 21:50:21 +0800 Subject: [PATCH 13/13] style: fix clippy --- src/mito2/src/memtable/merge_tree/data.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index eb6b064dabc5..3acdf76e9166 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -209,17 +209,15 @@ impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { - while let Some(mut top) = self.heap.pop() { - let top_batch = top.source.current_batch(); - if let Err(e) = top.source.next() { - return Some(Err(e)); - } - if top.source.is_valid() { - self.heap.push(top); - } - return Some(Ok(top_batch)); + let mut top = self.heap.pop()?; + let top_batch = top.source.current_batch(); + if let Err(e) = top.source.next() { + return Some(Err(e)); + } + if top.source.is_valid() { + self.heap.push(top); } - None + Some(Ok(top_batch)) } }