Skip to content

Commit

Permalink
perf: avoid holding memtable during compaction (#5157)
Browse files Browse the repository at this point in the history
* perf/avoid-holding-memtable-during-compaction: Refactor Compaction Version Handling

 • Introduced CompactionVersion struct to encapsulate region version details for compaction, removing dependency on VersionRef.
 • Updated CompactionRequest and CompactionRegion to use CompactionVersion.
 • Modified open_compaction_region to construct CompactionVersion without memtables.
 • Adjusted WindowedCompactionPicker to work with CompactionVersion.
 • Enhanced flush logic in WriteBufferManager to improve memory usage checks and logging.

* reformat code

* chore: change log level

* reformat code

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
v0y4g3r and evenyag authored Dec 17, 2024
1 parent 421088a commit c33cf59
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 65 deletions.
8 changes: 4 additions & 4 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::compaction::compactor::{CompactionRegion, DefaultCompactor};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
Expand All @@ -59,7 +59,7 @@ use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::region::options::MergeMode;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::version::VersionControlRef;
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::remote_job_scheduler::{
Expand All @@ -73,7 +73,7 @@ use crate::worker::WorkerListener;
/// Region compaction request.
pub struct CompactionRequest {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) current_version: VersionRef,
pub(crate) current_version: CompactionVersion,
pub(crate) access_layer: AccessLayerRef,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
Expand Down Expand Up @@ -522,7 +522,7 @@ impl CompactionStatus {
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
let mut req = CompactionRequest {
engine_config,
Expand Down
66 changes: 38 additions & 28 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,45 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionRef};
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileMeta, IndexType};
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::parquet::WriteOptions;
use crate::sst::version::{SstVersion, SstVersionRef};

/// Region version for compaction that does not hold memtables.
#[derive(Clone)]
pub struct CompactionVersion {
/// Metadata of the region.
///
/// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
/// metadata and reuse metadata when creating a new `Version`.
pub(crate) metadata: RegionMetadataRef,
/// Options of the region.
pub(crate) options: RegionOptions,
/// SSTs of the region.
pub(crate) ssts: SstVersionRef,
/// Inferred compaction time window.
pub(crate) compaction_time_window: Option<Duration>,
}

impl From<VersionRef> for CompactionVersion {
fn from(value: VersionRef) -> Self {
Self {
metadata: value.metadata.clone(),
options: value.options.clone(),
ssts: value.ssts.clone(),
compaction_time_window: value.compaction_time_window,
}
}
}

/// CompactionRegion represents a region that needs to be compacted.
/// It's the subset of MitoRegion.
Expand All @@ -62,7 +88,7 @@ pub struct CompactionRegion {
pub(crate) cache_manager: CacheManagerRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: VersionRef,
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,
}
Expand Down Expand Up @@ -147,30 +173,14 @@ pub async fn open_compaction_region(
};

let current_version = {
let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone()))
.builder_for_options(
req.region_options.memtable.as_ref(),
req.region_options.need_dedup(),
req.region_options.merge_mode(),
);

// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
region_metadata.clone(),
memtable_builder.clone(),
0,
req.region_options.compaction.time_window(),
));

let version = VersionBuilder::new(region_metadata.clone(), mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(req.region_options.clone())
.build();
Arc::new(version)
let mut ssts = SstVersion::new();
ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
CompactionVersion {
metadata: region_metadata.clone(),
options: req.region_options.clone(),
ssts: Arc::new(ssts),
compaction_time_window: manifest.compaction_time_window,
}
};

let ttl = find_ttl(
Expand Down
40 changes: 15 additions & 25 deletions src/mito2/src/compaction/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use common_time::Timestamp;
use store_api::storage::RegionId;

use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::region::version::VersionRef;
use crate::sst::file::{FileHandle, FileId};

/// Compaction picker that splits the time range of all involved files to windows, and merges
Expand All @@ -48,7 +47,11 @@ impl WindowedCompactionPicker {
// use persisted window. If persist window is not present, we check the time window
// provided while creating table. If all of those are absent, we infer the window
// from files in level0.
fn calculate_time_window(&self, region_id: RegionId, current_version: &VersionRef) -> i64 {
fn calculate_time_window(
&self,
region_id: RegionId,
current_version: &CompactionVersion,
) -> i64 {
self.compaction_time_window_seconds
.or(current_version
.compaction_time_window
Expand All @@ -67,7 +70,7 @@ impl WindowedCompactionPicker {
fn pick_inner(
&self,
region_id: RegionId,
current_version: &VersionRef,
current_version: &CompactionVersion,
current_time: Timestamp,
) -> (Vec<CompactionOutput>, Vec<FileHandle>, i64) {
let time_window = self.calculate_time_window(region_id, current_version);
Expand Down Expand Up @@ -205,28 +208,19 @@ mod tests {
use common_time::Timestamp;
use store_api::storage::RegionId;

use crate::compaction::compactor::CompactionVersion;
use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker};
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::version::MemtableVersion;
use crate::region::options::RegionOptions;
use crate::region::version::{Version, VersionRef};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::sst::version::SstVersion;
use crate::test_util::memtable_util::metadata_for_test;
use crate::test_util::NoopFilePurger;

fn build_version(files: &[(FileId, i64, i64, Level)], ttl: Option<Duration>) -> VersionRef {
fn build_version(
files: &[(FileId, i64, i64, Level)],
ttl: Option<Duration>,
) -> CompactionVersion {
let metadata = metadata_for_test();
let memtables = Arc::new(MemtableVersion::new(Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(PartitionTreeMemtableBuilder::new(
PartitionTreeConfig::default(),
None,
)),
0,
None,
))));
let file_purger_ref = Arc::new(NoopFilePurger);

let mut ssts = SstVersion::new();
Expand All @@ -244,14 +238,9 @@ mod tests {
}),
);

Arc::new(Version {
CompactionVersion {
metadata,
memtables,
ssts: Arc::new(ssts),
flushed_entry_id: 0,
flushed_sequence: 0,
truncated_entry_id: None,
compaction_time_window: None,
options: RegionOptions {
ttl: ttl.map(|t| t.into()),
compaction: Default::default(),
Expand All @@ -262,7 +251,8 @@ mod tests {
memtable: None,
merge_mode: None,
},
})
compaction_time_window: None,
}
}

#[test]
Expand Down
21 changes: 13 additions & 8 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, trace};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -141,17 +141,22 @@ impl WriteBufferManager for WriteBufferManagerImpl {
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
if memory_usage >= self.global_write_buffer_size
&& mutable_memtable_memory_usage >= self.global_write_buffer_size / 2
{
debug!(
if memory_usage >= self.global_write_buffer_size {
if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
debug!(
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
mutable_usage: {}.",
memory_usage,
self.global_write_buffer_size,
mutable_memtable_memory_usage,
);
return true;
mutable_memtable_memory_usage);
return true;
} else {
trace!(
"Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
memory_usage,
self.global_write_buffer_size,
mutable_memtable_memory_usage);
}
}

false
Expand Down

0 comments on commit c33cf59

Please sign in to comment.