diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 2e22da087c6b..4bb1dede0192 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -87,10 +87,11 @@ impl AccessLayer { write_opts: &WriteOptions, ) -> Result> { let path = location::sst_file_path(&self.region_dir, request.file_id); + let region_id = request.metadata.region_id; - if let Some(write_cache) = request.cache_manager.write_cache() { + let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { // Write to the write cache. - return write_cache + write_cache .write_and_upload_sst( SstUploadRequest { file_id: request.file_id, @@ -102,12 +103,25 @@ impl AccessLayer { }, write_opts, ) - .await; + .await? + } else { + // Write cache is disabled. + let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + writer.write_all(request.source, write_opts).await? + }; + + // Put parquet metadata to cache manager. + if let Some(sst_info) = &sst_info { + if let Some(parquet_metadata) = &sst_info.file_metadata { + request.cache_manager.put_parquet_meta_data( + region_id, + request.file_id, + parquet_metadata.clone(), + ) + } } - // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); - writer.write_all(request.source, write_opts).await + Ok(sst_info) } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 9cf45cdf9089..e97030ac383c 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -306,6 +306,7 @@ impl TwcsCompactionTask { let metadata = self.metadata.clone(); let sst_layer = self.sst_layer.clone(); let region_id = self.region_id; + let file_id = output.output_file_id; let cache_manager = self.cache_manager.clone(); let storage = self.storage.clone(); futs.push(async move { @@ -314,7 +315,7 @@ impl TwcsCompactionTask { let file_meta_opt = sst_layer .write_sst( SstWriteRequest { - file_id: output.output_file_id, + file_id, metadata, source: Source::Reader(reader), cache_manager, @@ -325,7 +326,7 @@ impl TwcsCompactionTask { .await? .map(|sst_info| FileMeta { region_id, - file_id: output.output_file_id, + file_id, time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 49c68e489fa3..6c528a392d61 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -331,6 +331,7 @@ impl RegionFlushTask { // No data written. continue; }; + flushed_bytes += sst_info.file_size; let file_meta = FileMeta { region_id: self.region_id, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 20259672e3bb..acee4f0232cc 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -22,6 +22,8 @@ pub mod row_group; mod stats; pub mod writer; +use std::sync::Arc; + use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; @@ -62,7 +64,7 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, /// File Meta Data - pub file_metadata: Option, + pub file_metadata: Option>, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 5d8392b6d58d..2ed226791dca 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,6 +14,8 @@ //! Parquet writer. +use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; @@ -121,7 +123,7 @@ impl ParquetWriter { time_range, file_size, num_rows: stats.num_rows, - file_metadata: Some(parquet_metadata), + file_metadata: Some(Arc::new(parquet_metadata)), })) }