Skip to content

Commit

Permalink
fix: Update region Version in the worker loop (#4114)
Browse files Browse the repository at this point in the history
* feat: handle region edit result

* feat: handle edit result

* feat: handle truncate result

* feat: flush compaction

* feat: invoke in worker

* feat: remove unused fields

* style: fix clippy

* feat: remove applier

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
evenyag and WenyXu authored Jun 7, 2024
1 parent 09e0e1b commit e982d2e
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 142 deletions.
24 changes: 2 additions & 22 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;

Expand All @@ -71,12 +70,10 @@ pub struct CompactionRequest {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) version_control: VersionControlRef,
pub(crate) listener: WorkerListener,
}

Expand Down Expand Up @@ -142,7 +139,6 @@ impl CompactionScheduler {
compact_options: compact_request::Options,
version_control: &VersionControlRef,
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
) -> Result<()> {
Expand All @@ -153,12 +149,8 @@ impl CompactionScheduler {
}

// The region can compact directly.
let mut status = CompactionStatus::new(
region_id,
version_control.clone(),
access_layer.clone(),
file_purger.clone(),
);
let mut status =
CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
Expand Down Expand Up @@ -330,8 +322,6 @@ struct CompactionStatus {
version_control: VersionControlRef,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// File purger of the region.
file_purger: FilePurgerRef,
/// Compaction pending to schedule.
///
/// For simplicity, we merge all pending compaction requests into one.
Expand All @@ -344,13 +334,11 @@ impl CompactionStatus {
region_id: RegionId,
version_control: VersionControlRef,
access_layer: AccessLayerRef,
file_purger: FilePurgerRef,
) -> CompactionStatus {
CompactionStatus {
region_id,
version_control,
access_layer,
file_purger,
pending_compaction: None,
}
}
Expand Down Expand Up @@ -392,11 +380,9 @@ impl CompactionStatus {
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
cache_manager,
manifest_ctx: manifest_ctx.clone(),
version_control: self.version_control.clone(),
listener,
};

Expand Down Expand Up @@ -547,7 +533,6 @@ mod tests {
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();

// Nothing to compact.
let version_control = Arc::new(builder.build());
Expand All @@ -562,7 +547,6 @@ mod tests {
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
waiter,
&manifest_ctx,
)
Expand All @@ -581,7 +565,6 @@ mod tests {
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
waiter,
&manifest_ctx,
)
Expand Down Expand Up @@ -644,7 +627,6 @@ mod tests {
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
&manifest_ctx,
)
Expand Down Expand Up @@ -673,7 +655,6 @@ mod tests {
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
&manifest_ctx,
)
Expand Down Expand Up @@ -705,7 +686,6 @@ mod tests {
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
OptionOutputTx::none(),
&manifest_ctx,
)
Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileMeta, IndexType};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;

Expand All @@ -54,7 +52,6 @@ pub(crate) struct CompactionTaskImpl {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
Expand All @@ -70,8 +67,6 @@ pub(crate) struct CompactionTaskImpl {
pub(crate) append_mode: bool,
/// Manifest context.
pub(crate) manifest_ctx: ManifestContextRef,
/// Version control to update.
pub(crate) version_control: VersionControlRef,
/// Event listener.
pub(crate) listener: WorkerListener,
}
Expand Down Expand Up @@ -216,7 +211,7 @@ impl CompactionTaskImpl {
Ok((output_files, inputs))
}

async fn handle_compaction(&mut self) -> error::Result<()> {
async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
self.mark_files_compacting(true);
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
Expand Down Expand Up @@ -260,11 +255,10 @@ impl CompactionTaskImpl {
// We might leak files if we fail to update manifest. We can add a cleanup task to
// remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
self.version_control
.apply_edit(edit, &[], self.file_purger.clone());
})
.await
.update_manifest(RegionState::Writable, action_list)
.await?;

Ok(edit)
}

/// Handles compaction failure, notifies all waiters.
Expand Down Expand Up @@ -292,10 +286,11 @@ impl CompactionTaskImpl {
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
edit,
}),
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@ impl Picker for TwcsPicker {
access_layer,
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
manifest_ctx,
version_control,
listener,
..
} = req;
Expand Down Expand Up @@ -175,14 +173,12 @@ impl Picker for TwcsPicker {
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
version_control,
listener,
};
Some(Box::new(task))
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/compaction/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ impl Picker for WindowedCompactionPicker {
access_layer,
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
manifest_ctx,
version_control,
listener,
} = req;

Expand All @@ -130,14 +128,12 @@ impl Picker for WindowedCompactionPicker {
compaction_time_window: Some(time_window),
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
version_control,
listener,
};
Some(Box::new(task))
Expand Down
50 changes: 21 additions & 29 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::request::{
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta, IndexType};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;

Expand Down Expand Up @@ -201,7 +200,6 @@ pub(crate) struct RegionFlushTask {
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,

pub(crate) access_layer: AccessLayerRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
Expand Down Expand Up @@ -243,31 +241,34 @@ impl RegionFlushTask {
// Get a version of this region before creating a job to get current
// wal entry id, sequence and immutable memtables.
let version_data = version_control.current();
// This is used to update the version.
let version_control = version_control.clone();

Box::pin(async move {
self.do_flush(version_data, &version_control).await;
self.do_flush(version_data).await;
})
}

/// Runs the flush task.
async fn do_flush(
&mut self,
version_data: VersionControlData,
version_control: &VersionControlRef,
) {
async fn do_flush(&mut self, version_data: VersionControlData) {
let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
self.listener.on_flush_begin(self.region_id).await;

let worker_request = match self.flush_memtables(&version_data, version_control).await {
Ok(()) => {
let worker_request = match self.flush_memtables(&version_data).await {
Ok(edit) => {
let memtables_to_remove = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let flush_finished = FlushFinished {
region_id: self.region_id,
// The last entry has been flushed.
flushed_entry_id: version_data.last_entry_id,
senders: std::mem::take(&mut self.senders),
_timer: timer,
edit,
memtables_to_remove,
};
WorkerRequest::Background {
region_id: self.region_id,
Expand All @@ -291,11 +292,10 @@ impl RegionFlushTask {
}

/// Flushes memtables to level 0 SSTs and updates the manifest.
async fn flush_memtables(
&self,
version_data: &VersionControlData,
version_control: &VersionControlRef,
) -> Result<()> {
/// Returns the [RegionEdit] to apply.
async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
// We must use the immutable memtables list and entry ids from the `version_data`
// for consistency as others might already modify the version in the `version_control`.
let version = &version_data.version;
let timer = FLUSH_ELAPSED
.with_label_values(&["flush_memtables"])
Expand Down Expand Up @@ -384,13 +384,6 @@ impl RegionFlushTask {
timer.stop_and_record(),
);

let memtables_to_remove: SmallVec<[_; 2]> = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let edit = RegionEdit {
files_to_add: file_metas,
files_to_remove: Vec::new(),
Expand All @@ -405,10 +398,10 @@ impl RegionFlushTask {
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
version_control.apply_edit(edit, &memtables_to_remove, self.file_purger.clone());
})
.await
.update_manifest(RegionState::Writable, action_list)
.await?;

Ok(edit)
}

/// Notify flush job status.
Expand Down Expand Up @@ -796,7 +789,6 @@ mod tests {
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
Expand Down
7 changes: 1 addition & 6 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,11 @@ impl ManifestContext {
self.manifest_manager.read().await.has_update().await
}

/// Updates the manifest if current state is `expect_state` and executes
/// the `applier` if the manifest is updated.
/// Updates the manifest if current state is `expect_state`.
pub(crate) async fn update_manifest(
&self,
expect_state: RegionState,
action_list: RegionMetaActionList,
applier: impl FnOnce(),
) -> Result<()> {
// Acquires the write lock of the manifest manager.
let mut manager = self.manifest_manager.write().await;
Expand Down Expand Up @@ -365,9 +363,6 @@ impl ManifestContext {
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;

// Executes the applier. We MUST hold the write lock.
applier();

if self.state.load() == RegionState::ReadOnly {
warn!(
"Region {} becomes read-only while updating manifest which may cause inconsistency",
Expand Down
Loading

0 comments on commit e982d2e

Please sign in to comment.