From 7c9161824d8edeb27a02b7d23aa6069d9f97eb9f Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 4 Jan 2024 19:50:52 +0800 Subject: [PATCH 1/4] feat: parquet metadata to sst meta cache --- src/mito2/src/compaction/twcs.rs | 28 ++++++++++++++++++++-------- src/mito2/src/flush.rs | 8 ++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 9cf45cdf9089..c8af1d089a26 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -306,6 +306,7 @@ impl TwcsCompactionTask { let metadata = self.metadata.clone(); let sst_layer = self.sst_layer.clone(); let region_id = self.region_id; + let file_id = output.output_file_id; let cache_manager = self.cache_manager.clone(); let storage = self.storage.clone(); futs.push(async move { @@ -314,21 +315,32 @@ impl TwcsCompactionTask { let file_meta_opt = sst_layer .write_sst( SstWriteRequest { - file_id: output.output_file_id, + file_id, metadata, source: Source::Reader(reader), - cache_manager, + cache_manager: cache_manager.clone(), storage, }, &write_opts, ) .await? - .map(|sst_info| FileMeta { - region_id, - file_id: output.output_file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, + .map(|sst_info| { + // Add parquet metadata to cache + sst_info.file_metadata.map(|metadata| { + cache_manager.put_parquet_meta_data( + region_id, + file_id, + Arc::new(metadata), + ); + }); + + FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + } }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 49c68e489fa3..81fa9e8e0661 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -331,6 +331,14 @@ impl RegionFlushTask { // No data written. continue; }; + // Add parquet file metadata to cache + if let Some(metadata) = sst_info.file_metadata { + self.cache_manager.put_parquet_meta_data( + self.region_id, + file_id, + Arc::new(metadata), + ); + } flushed_bytes += sst_info.file_size; let file_meta = FileMeta { region_id: self.region_id, From ce3e28108a95e5b1ff7b93f3c04a10567d1391fc Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 4 Jan 2024 20:07:34 +0800 Subject: [PATCH 2/4] chore: clippy --- src/mito2/src/compaction/twcs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index c8af1d089a26..5ae80f028966 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -326,13 +326,13 @@ impl TwcsCompactionTask { .await? .map(|sst_info| { // Add parquet metadata to cache - sst_info.file_metadata.map(|metadata| { + if let Some(metadata) = sst_info.file_metadata { cache_manager.put_parquet_meta_data( region_id, file_id, Arc::new(metadata), ); - }); + } FileMeta { region_id, From 8c23488f03c030c0427a9df9c32e8d6ad858ef70 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 5 Jan 2024 11:23:05 +0800 Subject: [PATCH 3/4] refactor: move code to access_layer --- src/mito2/src/access_layer.rs | 26 ++++++++++++++++++++------ src/mito2/src/compaction/twcs.rs | 23 ++++++----------------- src/mito2/src/flush.rs | 9 +-------- src/mito2/src/sst/parquet.rs | 4 +++- src/mito2/src/sst/parquet/writer.rs | 4 +++- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 2e22da087c6b..4bb1dede0192 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -87,10 +87,11 @@ impl AccessLayer { write_opts: &WriteOptions, ) -> Result> { let path = location::sst_file_path(&self.region_dir, request.file_id); + let region_id = request.metadata.region_id; - if let Some(write_cache) = request.cache_manager.write_cache() { + let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { // Write to the write cache. - return write_cache + write_cache .write_and_upload_sst( SstUploadRequest { file_id: request.file_id, @@ -102,12 +103,25 @@ impl AccessLayer { }, write_opts, ) - .await; + .await? + } else { + // Write cache is disabled. + let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + writer.write_all(request.source, write_opts).await? + }; + + // Put parquet metadata to cache manager. + if let Some(sst_info) = &sst_info { + if let Some(parquet_metadata) = &sst_info.file_metadata { + request.cache_manager.put_parquet_meta_data( + region_id, + request.file_id, + parquet_metadata.clone(), + ) + } } - // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); - writer.write_all(request.source, write_opts).await + Ok(sst_info) } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 5ae80f028966..4a25b3bef03a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -324,23 +324,12 @@ impl TwcsCompactionTask { &write_opts, ) .await? - .map(|sst_info| { - // Add parquet metadata to cache - if let Some(metadata) = sst_info.file_metadata { - cache_manager.put_parquet_meta_data( - region_id, - file_id, - Arc::new(metadata), - ); - } - - FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - } + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 81fa9e8e0661..6c528a392d61 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -331,14 +331,7 @@ impl RegionFlushTask { // No data written. continue; }; - // Add parquet file metadata to cache - if let Some(metadata) = sst_info.file_metadata { - self.cache_manager.put_parquet_meta_data( - self.region_id, - file_id, - Arc::new(metadata), - ); - } + flushed_bytes += sst_info.file_size; let file_meta = FileMeta { region_id: self.region_id, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 20259672e3bb..acee4f0232cc 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -22,6 +22,8 @@ pub mod row_group; mod stats; pub mod writer; +use std::sync::Arc; + use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; @@ -62,7 +64,7 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, /// File Meta Data - pub file_metadata: Option, + pub file_metadata: Option>, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 5d8392b6d58d..2ed226791dca 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,6 +14,8 @@ //! Parquet writer. +use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; @@ -121,7 +123,7 @@ impl ParquetWriter { time_range, file_size, num_rows: stats.num_rows, - file_metadata: Some(parquet_metadata), + file_metadata: Some(Arc::new(parquet_metadata)), })) } From 57133c4c47812822c7ac3e8454d356e2587074e9 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Fri, 5 Jan 2024 11:25:36 +0800 Subject: [PATCH 4/4] chore: clone() --- src/mito2/src/compaction/twcs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 4a25b3bef03a..e97030ac383c 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -318,7 +318,7 @@ impl TwcsCompactionTask { file_id, metadata, source: Source::Reader(reader), - cache_manager: cache_manager.clone(), + cache_manager, storage, }, &write_opts,