Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixes issues related to the single shard merge tree #18

Merged
merged 14 commits into from
Feb 19, 2024
4 changes: 4 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub struct MitoConfig {
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,

/// Experimental merge tree memtable.
pub experimental_merge_tree: bool,

/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,
}
Expand All @@ -126,6 +129,7 @@ impl Default for MitoConfig {
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
experimental_merge_tree: false,
inverted_index: InvertedIndexConfig::default(),
};

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl FlushScheduler {
}

// Now we can flush the region directly.
version_control.freeze_mutable().inspect(|e| {
version_control.freeze_mutable().inspect_err(|e| {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
})?;
// Submit a flush job.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Memtables are write buffers for regions.

pub mod key_values;
#[allow(unused)]
#[allow(dead_code)]
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;
Expand Down
9 changes: 4 additions & 5 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::merge_tree::mutable::WriteMetrics;
use crate::memtable::merge_tree::tree::{MergeTree, MergeTreeRef};
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
Expand Down Expand Up @@ -65,15 +64,15 @@ impl Default for MergeTreeConfig {
Self {
// TODO(yingwen): Use 4096 or find a proper value.
index_max_keys_per_shard: 8192,
freeze_threshold: 4096,
freeze_threshold: 409600,
}
}
}

/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
id: MemtableId,
tree: MergeTreeRef,
tree: MergeTree,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
Expand Down Expand Up @@ -190,7 +189,7 @@ impl MergeTreeMemtable {

Self {
id,
tree: Arc::new(tree),
tree,
alloc_tracker,
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
Expand Down
50 changes: 27 additions & 23 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datatypes::arrow;
use datatypes::arrow::array::{Array, RecordBatch, UInt16Array, UInt32Array};
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::vectors::{
Expand Down Expand Up @@ -209,19 +209,15 @@ impl Iterator for Iter {
type Item = Result<DataBatch>;

fn next(&mut self) -> Option<Self::Item> {
while let Some(mut top) = self.heap.pop() {
if top.source.is_valid() {
let top_batch = top.source.current_batch();
if let Err(e) = top.source.next() {
return Some(Err(e));
}
if top.source.is_valid() {
self.heap.push(top);
}
return Some(Ok(top_batch));
}
let mut top = self.heap.pop()?;
let top_batch = top.source.current_batch();
if let Err(e) = top.source.next() {
return Some(Err(e));
}
None
if top.source.is_valid() {
self.heap.push(top);
}
Some(Ok(top_batch))
}
}

Expand Down Expand Up @@ -257,7 +253,7 @@ impl DataParts {
}

pub(crate) fn is_empty(&self) -> bool {
unimplemented!()
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}
}

Expand Down Expand Up @@ -351,7 +347,7 @@ impl DataBuffer {
pub fn iter(&mut self, pk_weights: &[u16]) -> Result<DataBufferIter> {
let batch =
data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?;
Ok(DataBufferIter::new(batch))
DataBufferIter::new(batch)
}

/// Returns num of rows in data buffer.
Expand All @@ -373,14 +369,14 @@ pub(crate) struct DataBufferIter {
}

impl DataBufferIter {
pub(crate) fn new(batch: RecordBatch) -> Self {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut iter = Self {
batch,
offset: 0,
current_data_batch: None,
};
iter.next(); // fill data batch for comparison and merge.
iter
iter.next()?; // fill data batch for comparison and merge.
Ok(iter)
}

pub(crate) fn is_valid(&self) -> bool {
Expand Down Expand Up @@ -650,6 +646,14 @@ pub enum DataPart {
Parquet(Bytes),
}

impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(data) => data.is_empty(),
}
}
}

pub struct DataPartIter {
inner: ParquetRecordBatchReader,
current_range: Range<usize>,
Expand All @@ -673,7 +677,7 @@ impl DataPartIter {
if let Some(batch_size) = batch_size {
builder = builder.with_batch_size(batch_size);
}
let mut reader = builder.build().context(error::ReadDataPartSnafu)?;
let reader = builder.build().context(error::ReadDataPartSnafu)?;
let mut iter = Self {
inner: reader,
current_pk_index: None,
Expand Down Expand Up @@ -901,13 +905,13 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoded = encoder.write(&mut buffer).unwrap();
let s = String::from_utf8_lossy(encoded.as_bytes());
assert!(s.starts_with("PAR1"));
assert!(s.ends_with("PAR1"));

let mut builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
let mut reader = builder.build().unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(3, batch.num_rows());
Expand Down Expand Up @@ -975,7 +979,7 @@ mod tests {
3,
);

let mut encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = DataPartIter::new(encoded, Some(4)).unwrap();
Expand Down Expand Up @@ -1094,7 +1098,7 @@ mod tests {
active: buffer,
frozen: vec![part_0, part_1],
};
let mut iter = parts.iter(pk_weights.to_vec()).unwrap();
let iter = parts.iter(pk_weights.to_vec()).unwrap();
let mut res = Vec::with_capacity(expected_values.len());
for b in iter {
let batch = b.unwrap().as_record_batch();
Expand Down
7 changes: 2 additions & 5 deletions src/mito2/src/memtable/merge_tree/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@

//! Primary key index of the merge tree.

use std::cmp::Ordering;
use std::collections::{BTreeMap, BinaryHeap};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
use datatypes::arrow::compute;
use snafu::ResultExt;

use crate::error::{ComputeArrowSnafu, Result};
use crate::error::Result;
use crate::memtable::merge_tree::mutable::WriteMetrics;
use crate::memtable::merge_tree::{PkId, PkIndex, ShardId};

Expand Down
10 changes: 5 additions & 5 deletions src/mito2/src/memtable/merge_tree/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,19 @@ impl Default for WriteMetrics {
#[derive(Debug, Default)]
pub(crate) struct ReadMetrics {
/// Time used to initialize the iter.
init_cost: Duration,
pub(crate) init_cost: Duration,
/// Time used to prune rows.
prune_cost: Duration,
pub(crate) prune_cost: Duration,
/// Time used to sort and dedup rows.
sort_dedup_cost: Duration,
/// Time used to invoke next.
next_cost: Duration,
pub(crate) next_cost: Duration,
/// Number of batches returned by the iter.
num_batches: usize,
pub(crate) num_batches: usize,
/// Number of rows before prunning.
num_rows_before_prune: usize,
/// Number of rows returned.
num_rows_returned: usize,
pub(crate) num_rows_returned: usize,
/// Failures during evaluating expressions.
eval_failure_total: u32,
}
Expand Down
28 changes: 23 additions & 5 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use api::v1::OpType;
use common_time::Timestamp;
use datatypes::arrow::record_batch::RecordBatch;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
Expand All @@ -31,7 +31,7 @@ use crate::memtable::merge_tree::data::{self, DataBatch, DataParts};
use crate::memtable::merge_tree::index::{
compute_pk_weights, IndexConfig, IndexReader, KeyIndex, KeyIndexRef, ShardReader,
};
use crate::memtable::merge_tree::mutable::WriteMetrics;
use crate::memtable::merge_tree::mutable::{ReadMetrics, WriteMetrics};
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex};
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::read::{Batch, BatchBuilder};
Expand All @@ -51,8 +51,6 @@ pub(crate) struct MergeTree {
pub(crate) parts: RwLock<TreeParts>,
}

pub(crate) type MergeTreeRef = Arc<MergeTree>;

impl MergeTree {
/// Creates a new merge tree.
pub(crate) fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> MergeTree {
Expand Down Expand Up @@ -139,6 +137,9 @@ impl MergeTree {
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
let mut metrics = ReadMetrics::default();
let init_start = Instant::now();

assert!(predicate.is_none(), "Predicate is unsupported");
// Creates the projection set.
let projection: HashSet<_> = if let Some(projection) = projection {
Expand Down Expand Up @@ -170,11 +171,13 @@ impl MergeTree {
parts.data.iter(pk_weights)?
};

metrics.init_cost = init_start.elapsed();
let iter = ShardIter {
metadata: self.metadata.clone(),
projection,
index_reader,
data_reader: DataReader::new(data_iter)?,
metrics,
};

Ok(Box::new(iter))
Expand Down Expand Up @@ -303,17 +306,22 @@ struct ShardIter {
projection: HashSet<ColumnId>,
index_reader: Option<ShardReader>,
data_reader: DataReader,
metrics: ReadMetrics,
}

impl Iterator for ShardIter {
type Item = Result<Batch>;

fn next(&mut self) -> Option<Self::Item> {
let start = Instant::now();
if !self.data_reader.is_valid() {
self.metrics.next_cost += start.elapsed();
return None;
}

self.next_batch().transpose()
let ret = self.next_batch().transpose();
self.metrics.next_cost += start.elapsed();
ret
}
}

Expand All @@ -330,6 +338,8 @@ impl ShardIter {
)?;
// Advances the data reader.
self.data_reader.next()?;
self.metrics.num_batches += 1;
self.metrics.num_rows_returned += batch.num_rows();
return Ok(Some(batch));
};

Expand All @@ -352,10 +362,18 @@ impl ShardIter {
)?;
// Advances the data reader.
self.data_reader.next()?;
self.metrics.num_batches += 1;
self.metrics.num_rows_returned += batch.num_rows();
Ok(Some(batch))
}
}

impl Drop for ShardIter {
fn drop(&mut self) {
common_telemetry::info!("Shard iter drop, metrics: {:?}", self.metrics);
}
}

struct DataReader {
current: Option<DataBatch>,
iter: data::Iter,
Expand Down
15 changes: 12 additions & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
Expand Down Expand Up @@ -321,6 +322,16 @@ impl<S: LogStore> WorkerStarter<S> {
fn start(self) -> RegionWorker {
let regions = Arc::new(RegionMap::default());
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
let memtable_builder = if self.config.experimental_merge_tree {
info!("Use experimental merge tree memtable");
Arc::new(MergeTreeMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _
} else {
Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _
};

let running = Arc::new(AtomicBool::new(true));
let mut worker_thread = RegionWorkerLoop {
Expand All @@ -333,9 +344,7 @@ impl<S: LogStore> WorkerStarter<S> {
wal: Wal::new(self.log_store),
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))),
memtable_builder,
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
Expand Down
Loading