Skip to content

Commit

Permalink
feat: override __sequence on creating SST to save space and CPU (#5252
Browse files Browse the repository at this point in the history
)

* override memtable sequence

Signed-off-by: Ruihang Xia <[email protected]>

* override sst sequence

Signed-off-by: Ruihang Xia <[email protected]>

* chore changes per to CR comments

Signed-off-by: Ruihang Xia <[email protected]>

* use correct sequence number

Signed-off-by: Ruihang Xia <[email protected]>

* wrap a method to get max sequence

Signed-off-by: Ruihang Xia <[email protected]>

* fix typo

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 31, 2024
1 parent 75e4f30 commit 55b7656
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
Expand Down Expand Up @@ -164,7 +165,9 @@ impl AccessLayer {
request.metadata,
indexer,
);
writer.write_all(request.source, write_opts).await?
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};

// Put parquet metadata to cache manager.
Expand Down Expand Up @@ -194,6 +197,7 @@ pub(crate) struct SstWriteRequest {
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,

/// Configs for index
pub(crate) index_options: IndexOptions,
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ impl WriteCache {
indexer,
);

let sst_info = writer.write_all(write_request.source, write_opts).await?;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
.await?;

timer.stop_and_record();

Expand Down Expand Up @@ -375,6 +377,7 @@ mod tests {
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
Expand Down Expand Up @@ -468,6 +471,7 @@ mod tests {
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -303,6 +304,12 @@ impl Compactor for DefaultCompactor {
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -325,6 +332,7 @@ impl Compactor for DefaultCompactor {
source: Source::Reader(reader),
cache_manager,
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
inverted_index_config,
fulltext_index_config,
Expand All @@ -343,6 +351,7 @@ impl Compactor for DefaultCompactor {
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
});
Ok(file_meta_opt)
});
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn new_file_handle(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger,
)
Expand All @@ -63,6 +64,7 @@ pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger.clone(),
)
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ mod tests {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
Arc::new(NoopFilePurger),
)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_statistic();
assert_eq!(region_stat.sst_size, 2790);
assert!(region_stat.sst_size > 0); // Chief says this assert can ensure the size is counted.
assert_eq!(region_stat.num_rows, 10);

// region total usage
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Flush related utilities and structs.
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -345,6 +346,7 @@ impl RegionFlushTask {
continue;
}

let max_sequence = mem.stats().max_sequence();
let file_id = FileId::random();
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
Expand All @@ -357,6 +359,7 @@ impl RegionFlushTask {
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
Expand All @@ -382,6 +385,7 @@ impl RegionFlushTask {
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
};
file_metas.push(file_meta);
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ async fn checkpoint_with_different_compression_types() {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use bulk::part::BulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;

use crate::config::MitoConfig;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct MemtableStats {
num_rows: usize,
/// Total number of ranges in the memtable.
num_ranges: usize,
/// The maximum sequence number in the memtable.
max_sequence: SequenceNumber,
}

impl MemtableStats {
Expand Down Expand Up @@ -106,6 +108,11 @@ impl MemtableStats {
pub fn num_ranges(&self) -> usize {
self.num_ranges
}

/// Returns the maximum sequence number in the memtable.
pub fn max_sequence(&self) -> SequenceNumber {
self.max_sequence
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
Expand Down
19 changes: 19 additions & 0 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ impl KeyValues {
// Safety: rows is not None.
self.mutation.rows.as_ref().unwrap().rows.len()
}

/// Returns if this container is empty
pub fn is_empty(&self) -> bool {
self.mutation.rows.is_none()
}

/// Return the max sequence in this container.
///
/// When the mutation has no rows, the sequence is the same as the mutation sequence.
pub fn max_sequence(&self) -> SequenceNumber {
let mut sequence = self.mutation.sequence;
let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64;
sequence += num_rows;
if num_rows > 0 {
sequence -= 1;
}

sequence
}
}

/// Key value view of a mutation.
Expand Down
22 changes: 21 additions & 1 deletion src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod shard_builder;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
Expand Down Expand Up @@ -113,6 +113,7 @@ pub struct PartitionTreeMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
num_rows: AtomicUsize,
}
Expand All @@ -131,6 +132,10 @@ impl Memtable for PartitionTreeMemtable {
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
if kvs.is_empty() {
return Ok(());
}

// TODO(yingwen): Validate schema while inserting rows.

let mut metrics = WriteMetrics::default();
Expand All @@ -140,6 +145,12 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

// update max_sequence
if res.is_ok() {
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);
}

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
res
}
Expand All @@ -152,6 +163,12 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
}

self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
Expand Down Expand Up @@ -210,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}

Expand All @@ -229,6 +247,7 @@ impl Memtable for PartitionTreeMemtable {
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}

Expand Down Expand Up @@ -267,6 +286,7 @@ impl PartitionTreeMemtable {
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
num_rows: AtomicUsize::new(0),
max_sequence: AtomicU64::new(0),
}
}

Expand Down
21 changes: 20 additions & 1 deletion src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -100,6 +100,7 @@ pub struct TimeSeriesMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
dedup: bool,
merge_mode: MergeMode,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
Expand Down Expand Up @@ -134,6 +135,7 @@ impl TimeSeriesMemtable {
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: AtomicU64::new(0),
dedup,
merge_mode,
num_rows: Default::default(),
Expand Down Expand Up @@ -186,6 +188,10 @@ impl Memtable for TimeSeriesMemtable {
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
if kvs.is_empty() {
return Ok(());
}

let mut local_stats = WriteMetrics::default();

for kv in kvs.iter() {
Expand All @@ -199,6 +205,10 @@ impl Memtable for TimeSeriesMemtable {
// so that we can ensure writing to memtable will succeed.
self.update_stats(local_stats);

// update max_sequence
let sequence = kvs.max_sequence();
self.max_sequence.fetch_max(sequence, Ordering::Relaxed);

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
Ok(())
}
Expand All @@ -209,6 +219,13 @@ impl Memtable for TimeSeriesMemtable {
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();

self.update_stats(metrics);

// update max_sequence
if res.is_ok() {
self.max_sequence
.fetch_max(key_value.sequence(), Ordering::Relaxed);
}

self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}
Expand Down Expand Up @@ -294,6 +311,7 @@ impl Memtable for TimeSeriesMemtable {
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}
let ts_type = self
Expand All @@ -311,6 +329,7 @@ impl Memtable for TimeSeriesMemtable {
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
}

Expand Down
Loading

0 comments on commit 55b7656

Please sign in to comment.