From 8c23488f03c030c0427a9df9c32e8d6ad858ef70 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 5 Jan 2024 11:23:05 +0800 Subject: [PATCH] refactor: move code to access_layer --- src/mito2/src/access_layer.rs | 26 ++++++++++++++++++++------ src/mito2/src/compaction/twcs.rs | 23 ++++++----------------- src/mito2/src/flush.rs | 9 +-------- src/mito2/src/sst/parquet.rs | 4 +++- src/mito2/src/sst/parquet/writer.rs | 4 +++- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 2e22da087c6b..4bb1dede0192 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -87,10 +87,11 @@ impl AccessLayer { write_opts: &WriteOptions, ) -> Result> { let path = location::sst_file_path(&self.region_dir, request.file_id); + let region_id = request.metadata.region_id; - if let Some(write_cache) = request.cache_manager.write_cache() { + let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { // Write to the write cache. - return write_cache + write_cache .write_and_upload_sst( SstUploadRequest { file_id: request.file_id, @@ -102,12 +103,25 @@ impl AccessLayer { }, write_opts, ) - .await; + .await? + } else { + // Write cache is disabled. + let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + writer.write_all(request.source, write_opts).await? + }; + + // Put parquet metadata to cache manager. + if let Some(sst_info) = &sst_info { + if let Some(parquet_metadata) = &sst_info.file_metadata { + request.cache_manager.put_parquet_meta_data( + region_id, + request.file_id, + parquet_metadata.clone(), + ) + } } - // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); - writer.write_all(request.source, write_opts).await + Ok(sst_info) } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 5ae80f028966..4a25b3bef03a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -324,23 +324,12 @@ impl TwcsCompactionTask { &write_opts, ) .await? - .map(|sst_info| { - // Add parquet metadata to cache - if let Some(metadata) = sst_info.file_metadata { - cache_manager.put_parquet_meta_data( - region_id, - file_id, - Arc::new(metadata), - ); - } - - FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - } + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 81fa9e8e0661..6c528a392d61 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -331,14 +331,7 @@ impl RegionFlushTask { // No data written. continue; }; - // Add parquet file metadata to cache - if let Some(metadata) = sst_info.file_metadata { - self.cache_manager.put_parquet_meta_data( - self.region_id, - file_id, - Arc::new(metadata), - ); - } + flushed_bytes += sst_info.file_size; let file_meta = FileMeta { region_id: self.region_id, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 20259672e3bb..acee4f0232cc 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -22,6 +22,8 @@ pub mod row_group; mod stats; pub mod writer; +use std::sync::Arc; + use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; @@ -62,7 +64,7 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, /// File Meta Data - pub file_metadata: Option, + pub file_metadata: Option>, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 5d8392b6d58d..2ed226791dca 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,6 +14,8 @@ //! Parquet writer. +use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; @@ -121,7 +123,7 @@ impl ParquetWriter { time_range, file_size, num_rows: stats.num_rows, - file_metadata: Some(parquet_metadata), + file_metadata: Some(Arc::new(parquet_metadata)), })) }