Skip to content

Commit

Permalink
feat: add parquet metadata to cache (#3097)
Browse files Browse the repository at this point in the history
* feat: parquet metadata to sst meta cache

* chore: clippy

* refactor: move code to access_layer

* chore: clone()
  • Loading branch information
QuenKar authored Jan 9, 2024
1 parent 2c1b1ce commit 225ae95
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 10 deletions.
26 changes: 20 additions & 6 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ impl AccessLayer {
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;

if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
// Write to the write cache.
return write_cache
write_cache
.write_and_upload_sst(
SstUploadRequest {
file_id: request.file_id,
Expand All @@ -104,12 +105,25 @@ impl AccessLayer {
},
write_opts,
)
.await;
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
}
}

// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await
Ok(sst_info)
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ impl TwcsCompactionTask {
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
futs.push(async move {
Expand All @@ -314,7 +315,7 @@ impl TwcsCompactionTask {
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id: output.output_file_id,
file_id,
metadata,
source: Source::Reader(reader),
cache_manager,
Expand All @@ -325,7 +326,7 @@ impl TwcsCompactionTask {
.await?
.map(|sst_info| FileMeta {
region_id,
file_id: output.output_file_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl RegionFlushTask {
// No data written.
continue;
};

flushed_bytes += sst_info.file_size;
let file_meta = FileMeta {
region_id: self.region_id,
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod row_group;
mod stats;
pub mod writer;

use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

Expand Down Expand Up @@ -62,7 +64,7 @@ pub struct SstInfo {
/// Number of rows.
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<ParquetMetaData>,
pub file_metadata: Option<Arc<ParquetMetaData>>,
}

#[cfg(test)]
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Parquet writer.
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_telemetry::debug;
use common_time::Timestamp;
Expand Down Expand Up @@ -121,7 +123,7 @@ impl ParquetWriter {
time_range,
file_size,
num_rows: stats.num_rows,
file_metadata: Some(parquet_metadata),
file_metadata: Some(Arc::new(parquet_metadata)),
}))
}

Expand Down

0 comments on commit 225ae95

Please sign in to comment.