Skip to content

Commit

Permalink
refactor: use ? instead of map_err in compaction mod
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 21, 2024
1 parent 48cc40f commit 0c31d94
Showing 1 changed file with 18 additions and 38 deletions.
56 changes: 18 additions & 38 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ where

self.version_set
.apply_edits(version_edits, Some(delete_gens), false)
.await
.map_err(CompactionError::Version)?;
.await?;
}
}
// TODO
Expand All @@ -108,14 +107,10 @@ where
// let mut wal_ids = Vec::with_capacity(batches.len());

let mut writer = AsyncArrowWriter::try_new(
FP::open(option.table_path(&gen))
.await
.map_err(CompactionError::Io)?
.compat(),
FP::open(option.table_path(&gen)).await?.compat(),
R::arrow_schema().clone(),
option.write_parquet_option.clone(),
)
.map_err(CompactionError::Parquet)?;
)?;

for batch in batches {
if let (Some(batch_min), Some(batch_max)) = batch.scope() {
Expand All @@ -126,14 +121,11 @@ where
max = Some(batch_max.clone())
}
}
writer
.write(batch.as_record_batch())
.await
.map_err(CompactionError::Parquet)?;
writer.write(batch.as_record_batch()).await?;
// TODO: WAL CLEAN
// wal_ids.push(wal_id);
}
writer.close().await.map_err(CompactionError::Parquet)?;
writer.close().await?;
return Ok(Some(Scope {
min: min.ok_or(CompactionError::EmptyLevel)?,
max: max.ok_or(CompactionError::EmptyLevel)?,
Expand Down Expand Up @@ -211,15 +203,12 @@ where
// This Level
if level == 0 {
for scope in meet_scopes_l.iter() {
let file = FP::open(option.table_path(&scope.gen))
.await
.map_err(CompactionError::Io)?;
let file = FP::open(option.table_path(&scope.gen)).await?;

streams.push(ScanStream::SsTable {
inner: SsTable::open(file)
.scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None)
.await
.map_err(CompactionError::Parquet)?,
.await?,
});
}
} else {
Expand Down Expand Up @@ -255,9 +244,7 @@ where
streams.push(ScanStream::Level {
inner: level_scan_ll,
});
let mut stream = MergeStream::<R, FP>::from_vec(streams)
.await
.map_err(CompactionError::Parquet)?;
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(8192);
Expand All @@ -266,7 +253,7 @@ where
let mut max = None;

while let Some(result) = Pin::new(&mut stream).next().await {
let entry = result.map_err(CompactionError::Parquet)?;
let entry = result?;
let key = entry.key();

if min.is_none() {
Expand Down Expand Up @@ -337,25 +324,18 @@ where
min: &mut Option<R::Key>,
max: &mut Option<R::Key>,
) -> Result<(), CompactionError<R>> {
assert!(min.is_some());
assert!(max.is_some());
debug_assert!(min.is_some());
debug_assert!(max.is_some());

let gen = Ulid::new();
let columns = builder.finish();
let mut writer = AsyncArrowWriter::try_new(
FP::open(option.table_path(&gen))
.await
.map_err(CompactionError::Io)?
.compat(),
FP::open(option.table_path(&gen)).await?.compat(),
R::arrow_schema().clone(),
option.write_parquet_option.clone(),
)
.map_err(CompactionError::Parquet)?;
writer
.write(columns.as_record_batch())
.await
.map_err(CompactionError::Parquet)?;
writer.close().await.map_err(CompactionError::Parquet)?;
)?;
writer.write(columns.as_record_batch()).await?;
writer.close().await?;
version_edits.push(VersionEdit::Add {
level: (level + 1) as u8,
scope: Scope {
Expand All @@ -375,11 +355,11 @@ where
R: Record,
{
#[error("compaction io error: {0}")]
Io(#[source] std::io::Error),
Io(#[from] std::io::Error),
#[error("compaction parquet error: {0}")]
Parquet(#[source] parquet::errors::ParquetError),
Parquet(#[from] parquet::errors::ParquetError),
#[error("compaction version error: {0}")]
Version(#[source] VersionError<R>),
Version(#[from] VersionError<R>),
#[error("the level being compacted does not have a table")]
EmptyLevel,
}
Expand Down

0 comments on commit 0c31d94

Please sign in to comment.