Skip to content

Commit

Permalink
fix: clean recover wals
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Aug 1, 2024
1 parent 8be1f7d commit c07642e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
14 changes: 11 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ where
if guard.immutables.len() > self.option.immutable_chunk_num {
let excess = guard.immutables.split_off(self.option.immutable_chunk_num);

if let Some(scope) =
Self::minor_compaction(&self.option, mem::replace(&mut guard.immutables, excess))
.await?
if let Some(scope) = Self::minor_compaction(
&self.option,
guard.recover_wal_ids.take(),
mem::replace(&mut guard.immutables, excess),
)
.await?
{
let version_ref = self.version_set.current().await;
let mut version_edits = vec![];
Expand Down Expand Up @@ -118,6 +121,7 @@ where

pub(crate) async fn minor_compaction(
option: &DbOption,
recover_wal_ids: Option<Vec<FileId>>,
batches: VecDeque<(FileId, Immutable<R::Columns>)>,
) -> Result<Option<Scope<R::Key>>, CompactionError<R>> {
if !batches.is_empty() {
Expand All @@ -133,6 +137,9 @@ where
option.write_parquet_option.clone(),
)?;

if let Some(mut recover_wal_ids) = recover_wal_ids {
wal_ids.append(&mut recover_wal_ids);
}
for (file_id, batch) in batches {
if let (Some(batch_min), Some(batch_max)) = batch.scope() {
if matches!(min.as_ref().map(|min| min > batch_min), Some(true) | None) {
Expand Down Expand Up @@ -565,6 +572,7 @@ pub(crate) mod tests {

let scope = Compactor::<Test, TokioExecutor>::minor_compaction(
&DbOption::from(temp_dir.path()),
None,
VecDeque::from(vec![(FileId::new(), batch_2), (FileId::new(), batch_1)]),
)
.await
Expand Down
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ where
immutables: VecDeque<(FileId, Immutable<R::Columns>)>,
compaction_tx: Sender<CompactTask>,
option: Arc<DbOption>,
recover_wal_ids: Option<Vec<FileId>>,
}

impl<R, FP> Schema<R, FP>
Expand All @@ -187,19 +188,22 @@ where
option: Arc<DbOption>,
compaction_tx: Sender<CompactTask>,
) -> Result<Self, WriteError<R>> {
let schema = Schema {
let mut schema = Schema {
mutable: Mutable::new(&option).await?,
immutables: Default::default(),
compaction_tx,
option: option.clone(),
recover_wal_ids: None,
};

let mut transaction_map = HashMap::new();
let mut wal_stream = pin!(FP::list(option.wal_dir_path(), FileType::Wal)?);
let mut wal_ids = Vec::new();

while let Some(wal) = wal_stream.next().await {
let (file, wal_id) = wal?;
let mut wal = WalFile::<FP::File, R>::new(file, wal_id);
wal_ids.push(wal_id);

let mut recover_stream = pin!(wal.recover());
while let Some(record) = recover_stream.next().await {
Expand Down Expand Up @@ -239,6 +243,7 @@ where
}
}
}
schema.recover_wal_ids = Some(wal_ids);

Ok(schema)
}
Expand Down Expand Up @@ -770,6 +775,7 @@ pub(crate) mod tests {
immutables,
compaction_tx,
option,
recover_wal_ids: None,
},
compaction_rx,
))
Expand Down

0 comments on commit c07642e

Please sign in to comment.