Skip to content

Commit

Permalink
refactor: move code to access_layer
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Jan 5, 2024
1 parent ce3e281 commit 8c23488
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
26 changes: 20 additions & 6 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ impl AccessLayer {
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;

if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
// Write to the write cache.
return write_cache
write_cache
.write_and_upload_sst(
SstUploadRequest {
file_id: request.file_id,
Expand All @@ -102,12 +103,25 @@ impl AccessLayer {
},
write_opts,
)
.await;
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
}
}

// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await
Ok(sst_info)
}
}

Expand Down
23 changes: 6 additions & 17 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,23 +324,12 @@ impl TwcsCompactionTask {
&write_opts,
)
.await?
.map(|sst_info| {
// Add parquet metadata to cache
if let Some(metadata) = sst_info.file_metadata {
cache_manager.put_parquet_meta_data(
region_id,
file_id,
Arc::new(metadata),
);
}

FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
}
.map(|sst_info| FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
});
Ok(file_meta_opt)
});
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,7 @@ impl RegionFlushTask {
// No data written.
continue;
};
// Add parquet file metadata to cache
if let Some(metadata) = sst_info.file_metadata {
self.cache_manager.put_parquet_meta_data(
self.region_id,
file_id,
Arc::new(metadata),
);
}

flushed_bytes += sst_info.file_size;
let file_meta = FileMeta {
region_id: self.region_id,
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod row_group;
mod stats;
pub mod writer;

use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

Expand Down Expand Up @@ -62,7 +64,7 @@ pub struct SstInfo {
/// Number of rows.
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<ParquetMetaData>,
pub file_metadata: Option<Arc<ParquetMetaData>>,
}

#[cfg(test)]
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Parquet writer.
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_telemetry::debug;
use common_time::Timestamp;
Expand Down Expand Up @@ -121,7 +123,7 @@ impl ParquetWriter {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: Some(parquet_metadata),
file_metadata: Some(Arc::new(parquet_metadata)),
}))
}

Expand Down

0 comments on commit 8c23488

Please sign in to comment.