Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into refactor/schema-trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Dec 16, 2024
2 parents 75a8008 + e3e5349 commit 3751742
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
38 changes: 20 additions & 18 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,35 +65,37 @@ where
pub(crate) async fn check_then_compaction(
&mut self,
parquet_lru: ParquetLru,
is_manual: bool,
) -> Result<(), CompactionError<R>> {
let mut guard = self.schema.write().await;

guard.trigger.reset();

if guard.mutable.is_empty() {
if !guard.mutable.is_empty() {
let trigger_clone = guard.trigger.clone();

let mutable = mem::replace(
&mut guard.mutable,
Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?,

Check failure on line 79 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

this function takes 4 arguments but 3 arguments were supplied

Check failure on line 79 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

this function takes 4 arguments but 3 arguments were supplied
);
let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?;

Check failure on line 81 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

no field `record_instance` on type `async_lock::RwLockWriteGuard<'_, Schema<R>>`

Check failure on line 81 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

this method takes 0 arguments but 1 argument was supplied

Check failure on line 81 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

no field `record_instance` on type `async_lock::RwLockWriteGuard<'_, Schema<R>>`

Check failure on line 81 in src/compaction/mod.rs

View workflow job for this annotation

GitHub Actions / check

this method takes 0 arguments but 1 argument was supplied
guard.immutables.push((file_id, immutable));
} else if !is_manual {
return Ok(());
}

let trigger_clone = guard.trigger.clone();
let mutable = mem::replace(
&mut guard.mutable,
Mutable::new(
&self.option,
trigger_clone,
self.manager.base_fs(),
self.record_schema.clone(),
)
.await?,
);
let (file_id, immutable) = mutable.into_immutable().await?;

guard.immutables.push((file_id, immutable));
if guard.immutables.len() > self.option.immutable_chunk_max_num {
if (is_manual && !guard.immutables.is_empty())
|| guard.immutables.len() > self.option.immutable_chunk_max_num
{
let recover_wal_ids = guard.recover_wal_ids.take();
drop(guard);

let guard = self.schema.upgradable_read().await;
let chunk_num = self.option.immutable_chunk_num;
let chunk_num = if is_manual {
guard.immutables.len()
} else {
self.option.immutable_chunk_num
};
let excess = &guard.immutables[0..chunk_num];

if let Some(scope) = Self::minor_compaction(
Expand Down Expand Up @@ -313,7 +315,7 @@ where
level: (level + 1) as u8,
gen: scope.gen,
});
delete_gens.push((scope.gen, level));
delete_gens.push((scope.gen, level + 1));
}
level += 1;
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ where
if let Err(err) = match task {
CompactTask::Freeze => {
compactor
.check_then_compaction(compact_task_cache.clone())
.check_then_compaction(compact_task_cache.clone(), false)
.await
}
CompactTask::Flush(option_tx) => {
let mut result = compactor
.check_then_compaction(compact_task_cache.clone())
.check_then_compaction(compact_task_cache.clone(), true)
.await;
if let Some(tx) = option_tx {
if result.is_ok() {
Expand Down Expand Up @@ -1350,12 +1350,12 @@ pub(crate) mod tests {
if let Err(err) = match task {
CompactTask::Freeze => {
compactor
.check_then_compaction(Arc::new(NoCache::default()))
.check_then_compaction(Arc::new(NoCache::default()), false)
.await
}
CompactTask::Flush(option_tx) => {
let mut result = compactor
.check_then_compaction(Arc::new(NoCache::default()))
.check_then_compaction(Arc::new(NoCache::default()), true)
.await;
if let Some(tx) = option_tx {
let channel_result =
Expand Down

0 comments on commit 3751742

Please sign in to comment.