Skip to content

Commit

Permalink
fix: remove memtables after flush
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 7, 2023
1 parent 576ba89 commit 4419a90
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 12 deletions.
6 changes: 5 additions & 1 deletion src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn test_manual_flush() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -98,7 +99,7 @@ async fn test_flush_engine() {

write_buffer_manager.set_should_flush(true);

// Writes and triggers flush.
// Writes to the mutable memtable and triggers flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
Expand All @@ -110,6 +111,7 @@ async fn test_flush_engine() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -174,6 +176,7 @@ async fn test_write_stall() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -209,6 +212,7 @@ async fn test_flush_empty() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,11 @@ impl FlushScheduler {
}

/// Returns true if the region has pending DDLs.
pub(crate) fn has_pending_ddls(&self, _region_id: RegionId) -> bool {
unimplemented!()
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
self.region_status
.get(&region_id)
.map(|status| !status.pending_ddls.is_empty())
.unwrap_or(false)
}

/// Schedules a new flush task when the scheduler can submit next task.
Expand Down
14 changes: 11 additions & 3 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::sync::Arc;

use smallvec::SmallVec;

use crate::memtable::MemtableRef;
use crate::memtable::{MemtableId, MemtableRef};

/// A version of current memtables in a region.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: MemtableRef,
Expand Down Expand Up @@ -85,7 +85,15 @@ impl MemtableVersion {
})
}

// TODO(yingwen): Remove memtables on flush done.
/// Removes memtables by ids from immutable memtables.
pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
self.immutables = self
.immutables
.iter()
.filter(|mem| !ids.contains(&mem.id()))
.cloned()
.collect();
}

/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_usage(&self) -> usize {
Expand Down
12 changes: 12 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ impl Scanner {
Scanner::Seq(seq_scan) => seq_scan.num_files(),
}
}

/// Returns number of memtables to scan.
pub(crate) fn num_memtables(&self) -> usize {
match self {
Scanner::Seq(seq_scan) => seq_scan.num_memtables(),
}
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
Expand Down Expand Up @@ -139,6 +146,11 @@ impl ScanRegion {
}

let memtables = self.version.memtables.list_memtables();
// Skip empty memtables.
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| !mem.is_empty())
.collect();

debug!(
"Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ impl SeqScan {

#[cfg(test)]
impl SeqScan {
/// Returns number of memtables to scan.
pub(crate) fn num_memtables(&self) -> usize {
self.memtables.len()
}

/// Returns number of SST files to scan.
pub(crate) fn num_files(&self) -> usize {
self.files.len()
Expand Down
20 changes: 18 additions & 2 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use store_api::storage::SequenceNumber;

use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;
Expand Down Expand Up @@ -90,11 +90,17 @@ impl VersionControl {
}

/// Apply edit to current version.
pub(crate) fn apply_edit(&self, edit: RegionEdit, purger: FilePurgerRef) {
pub(crate) fn apply_edit(
&self,
edit: RegionEdit,
memtables_to_remove: &[MemtableId],
purger: FilePurgerRef,
) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.remove_memtables(memtables_to_remove)
.build(),
);

Expand Down Expand Up @@ -208,6 +214,16 @@ impl VersionBuilder {
self
}

/// Remove memtables from the builder.
pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> VersionBuilder {
if !ids.is_empty() {
let mut memtables = (*self.memtables).clone();
memtables.remove_memtables(ids);
self.memtables = Arc::new(memtables);
}
self
}

/// Builds a new [Version] from the builder.
pub(crate) fn build(self) -> Version {
Version {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<S> RegionWorkerLoop<S> {
return;
}

// TODO(yingwen): Maybe assert in add_ddl_request_to_pending instead returning result.
// Safety: We have requested flush.
self.flush_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}

// Apply edit to region's version.
region
.version_control
.apply_edit(edit, region.file_purger.clone());
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();

// Delete wal.
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl<S> RegionWorkerLoop<S> {
// If region is waiting for alteration, add requests to pending writes.
if self.flush_scheduler.has_pending_ddls(region_id) {
// TODO(yingwen): consider adding some metrics for this.
// Safety: The region has pending ddls.
self.flush_scheduler
.add_write_request_to_pending(sender_req);
continue;
Expand Down

0 comments on commit 4419a90

Please sign in to comment.