diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 1f4fd731efa7..2e22da087c6b 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -18,18 +18,22 @@ use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use crate::cache::write_cache::SstUploadRequest; +use crate::cache::CacheManagerRef; use crate::error::{DeleteSstSnafu, Result}; use crate::read::Source; use crate::sst::file::{FileHandle, FileId}; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; +use crate::sst::parquet::{SstInfo, WriteOptions}; pub type AccessLayerRef = Arc; /// A layer to access SST files under the same directory. pub struct AccessLayer { region_dir: String, + /// Target object store. object_store: ObjectStore, } @@ -74,15 +78,44 @@ impl AccessLayer { ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone()) } - /// Returns a new parquet writer to write the SST for specific `file_id`. - // TODO(hl): maybe rename to [sst_writer]. - pub(crate) fn write_sst( + /// Writes a SST with specific `file_id` and `metadata` to the layer. + /// + /// Returns the info of the SST. If no data written, returns None. + pub(crate) async fn write_sst( &self, - file_id: FileId, - metadata: RegionMetadataRef, - source: Source, - ) -> ParquetWriter { - let path = location::sst_file_path(&self.region_dir, file_id); - ParquetWriter::new(path, metadata, source, self.object_store.clone()) + request: SstWriteRequest, + write_opts: &WriteOptions, + ) -> Result> { + let path = location::sst_file_path(&self.region_dir, request.file_id); + + if let Some(write_cache) = request.cache_manager.write_cache() { + // Write to the write cache. + return write_cache + .write_and_upload_sst( + SstUploadRequest { + file_id: request.file_id, + metadata: request.metadata, + source: request.source, + storage: request.storage, + upload_path: path, + remote_store: self.object_store.clone(), + }, + write_opts, + ) + .await; + } + + // Write cache is disabled. + let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + writer.write_all(request.source, write_opts).await } } + +/// Contents to build a SST. +pub(crate) struct SstWriteRequest { + pub(crate) file_id: FileId, + pub(crate) metadata: RegionMetadataRef, + pub(crate) source: Source, + pub(crate) cache_manager: CacheManagerRef, + pub(crate) storage: Option, +} diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index cc02a2d037ce..62c91b2b4155 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -20,6 +20,8 @@ mod cache_size; pub(crate) mod file_cache; #[cfg(test)] pub(crate) mod test_util; +#[allow(unused)] +pub(crate) mod write_cache; use std::mem; use std::sync::Arc; @@ -32,6 +34,7 @@ use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; use crate::cache::cache_size::parquet_meta_size; +use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; @@ -44,6 +47,8 @@ const PAGE_TYPE: &str = "page"; // Metrics type key for files on the local store. const FILE_TYPE: &str = "file"; +// TODO(yingwen): Builder for cache manager. + /// Manages cached data for the engine. pub struct CacheManager { /// Cache for SST metadata. @@ -52,6 +57,10 @@ pub struct CacheManager { vector_cache: Option, /// Cache for SST pages. page_cache: Option, + /// A Cache for writing files to object stores. + // TODO(yingwen): Remove this once the cache is ready. + #[allow(unused)] + write_cache: Option, } pub type CacheManagerRef = Arc; @@ -111,6 +120,7 @@ impl CacheManager { sst_meta_cache, vector_cache, page_cache, + write_cache: None, } } @@ -184,6 +194,11 @@ impl CacheManager { cache.insert(page_key, pages); } } + + /// Gets the the write cache. + pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { + self.write_cache.as_ref() + } } fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 25fd5d6d62bf..3fd3408edd89 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -14,6 +14,7 @@ //! A cache for files. +use std::sync::Arc; use std::time::Instant; use common_base::readable_size::ReadableSize; @@ -21,7 +22,7 @@ use common_telemetry::{info, warn}; use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; -use object_store::util::{join_dir, join_path}; +use object_store::util::join_path; use object_store::{ErrorKind, Metakey, ObjectStore, Reader}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -32,7 +33,7 @@ use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; /// Subdirectory of cached files. -const FILE_DIR: &str = "files"; +const FILE_DIR: &str = "files/"; /// A file cache manages files on local store and evict files based /// on size. @@ -40,25 +41,18 @@ const FILE_DIR: &str = "files"; pub(crate) struct FileCache { /// Local store to cache files. local_store: ObjectStore, - /// Cached file directory under cache home. - file_dir: String, /// Index to track cached files. /// /// File id is enough to identity a file uniquely. memory_index: Cache, } +pub(crate) type FileCacheRef = Arc; + impl FileCache { /// Creates a new file cache. - pub(crate) fn new( - local_store: ObjectStore, - cache_home: String, - capacity: ReadableSize, - ) -> FileCache { - // Stores files under `cache_home/{FILE_DIR}`. - let file_dir = cache_file_dir(&cache_home); + pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache { let cache_store = local_store.clone(); - let cache_file_dir = file_dir.clone(); let memory_index = Cache::builder() .weigher(|_key, value: &IndexValue| -> u32 { // We only measure space on local store. @@ -67,7 +61,8 @@ impl FileCache { .max_capacity(capacity.as_bytes()) .async_eviction_listener(move |key, value, cause| { let store = cache_store.clone(); - let file_path = cache_file_path(&cache_file_dir, *key); + // Stores files under FILE_DIR. + let file_path = cache_file_path(FILE_DIR, *key); async move { if let RemovalCause::Replaced = cause { // The cache is replaced by another file. This is unexpected, we don't remove the same @@ -91,7 +86,6 @@ impl FileCache { .build(); FileCache { local_store, - file_dir, memory_index, } } @@ -145,7 +139,7 @@ impl FileCache { let mut lister = self .local_store - .lister_with(&self.file_dir) + .lister_with(FILE_DIR) .metakey(Metakey::ContentLength) .await .context(OpenDalSnafu)?; @@ -182,7 +176,7 @@ impl FileCache { /// Returns the cache file path for the key. pub(crate) fn cache_file_path(&self, key: IndexKey) -> String { - cache_file_path(&self.file_dir, key) + cache_file_path(FILE_DIR, key) } /// Returns the local store of the file cache. @@ -203,11 +197,6 @@ pub(crate) struct IndexValue { file_size: u32, } -/// Returns the directory to store files. -fn cache_file_dir(cache_home: &str) -> String { - join_dir(cache_home, FILE_DIR) -} - /// Generates the path to the cached file. /// /// The file name format is `{region_id}.{file_id}` @@ -245,13 +234,8 @@ mod tests { async fn test_file_cache_basic() { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache_home = "cache".to_string(); - let cache = FileCache::new( - local_store.clone(), - cache_home.clone(), - ReadableSize::mb(10), - ); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = (region_id, file_id); @@ -291,13 +275,8 @@ mod tests { async fn test_file_cache_file_removed() { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache_home = "cache".to_string(); - let cache = FileCache::new( - local_store.clone(), - cache_home.clone(), - ReadableSize::mb(10), - ); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = (region_id, file_id); @@ -326,12 +305,7 @@ mod tests { async fn test_file_cache_recover() { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache_home = "cache".to_string(); - let cache = FileCache::new( - local_store.clone(), - cache_home.clone(), - ReadableSize::mb(10), - ); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); // Write N files. @@ -354,11 +328,7 @@ mod tests { } // Recover the cache. - let cache = FileCache::new( - local_store.clone(), - cache_home.clone(), - ReadableSize::mb(10), - ); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); // No entry before recovery. assert!(cache.reader((region_id, file_ids[0])).await.is_none()); cache.recover().await.unwrap(); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs new file mode 100644 index 000000000000..b640ba896666 --- /dev/null +++ b/src/mito2/src/cache/write_cache.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A write-through cache for remote object stores. + +use std::sync::Arc; + +use common_base::readable_size::ReadableSize; +use object_store::manager::ObjectStoreManagerRef; +use object_store::ObjectStore; +use store_api::metadata::RegionMetadataRef; + +use crate::cache::file_cache::{FileCache, FileCacheRef}; +use crate::error::Result; +use crate::read::Source; +use crate::sst::file::FileId; +use crate::sst::parquet::writer::ParquetWriter; +use crate::sst::parquet::{SstInfo, WriteOptions}; + +/// A cache for uploading files to remote object stores. +/// +/// It keeps files in local disk and then sends files to object stores. +pub struct WriteCache { + /// Local file cache. + file_cache: FileCacheRef, + /// Object store manager. + object_store_manager: ObjectStoreManagerRef, +} + +pub type WriteCacheRef = Arc; + +impl WriteCache { + /// Create the cache with a `local_store` to cache files and a + /// `object_store_manager` for all object stores. + pub fn new( + local_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, + cache_capacity: ReadableSize, + ) -> Self { + Self { + file_cache: Arc::new(FileCache::new(local_store, cache_capacity)), + object_store_manager, + } + } + + /// Recovers the write cache from local store. + pub async fn recover(&self) -> Result<()> { + self.file_cache.recover().await + } + + /// Writes SST to the cache and then uploads it to the remote object store. + pub async fn write_and_upload_sst( + &self, + request: SstUploadRequest, + write_opts: &WriteOptions, + ) -> Result> { + // TODO(yingwen): Write to the local store and then upload. + // Now we write to the remote and ignore local cache. + let mut writer = + ParquetWriter::new(request.upload_path, request.metadata, request.remote_store); + writer.write_all(request.source, write_opts).await + } +} + +/// Request to write and upload a SST. +pub struct SstUploadRequest { + pub file_id: FileId, + pub metadata: RegionMetadataRef, + pub source: Source, + pub storage: Option, + /// Path to upload the file. + pub upload_path: String, + /// Remote object store to upload. + pub remote_store: ObjectStore, +} diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 94f04b17aa66..0ddcec61d0f2 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod output; mod picker; #[cfg(test)] mod test_util; @@ -30,6 +29,7 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::compaction::twcs::TwcsPicker; use crate::config::MitoConfig; use crate::error::{ @@ -55,6 +55,7 @@ pub struct CompactionRequest { pub(crate) start_time: Instant, /// Buffering threshold while writing SST files. pub(crate) sst_write_buffer_size: ReadableSize, + pub(crate) cache_manager: CacheManagerRef, } impl CompactionRequest { @@ -88,14 +89,20 @@ pub(crate) struct CompactionScheduler { region_status: HashMap, /// Request sender of the worker that this scheduler belongs to. request_sender: Sender, + cache_manager: CacheManagerRef, } impl CompactionScheduler { - pub(crate) fn new(scheduler: SchedulerRef, request_sender: Sender) -> Self { + pub(crate) fn new( + scheduler: SchedulerRef, + request_sender: Sender, + cache_manager: CacheManagerRef, + ) -> Self { Self { scheduler, region_status: HashMap::new(), request_sender, + cache_manager, } } @@ -122,8 +129,12 @@ impl CompactionScheduler { access_layer.clone(), file_purger.clone(), ); - let request = - status.new_compaction_request(self.request_sender.clone(), waiter, engine_config); + let request = status.new_compaction_request( + self.request_sender.clone(), + waiter, + engine_config, + self.cache_manager.clone(), + ); self.region_status.insert(region_id, status); self.schedule_compaction_request(request) } @@ -142,6 +153,7 @@ impl CompactionScheduler { self.request_sender.clone(), OptionOutputTx::none(), engine_config, + self.cache_manager.clone(), ); // Try to schedule next compaction task for this region. if let Err(e) = self.schedule_compaction_request(request) { @@ -314,6 +326,7 @@ impl CompactionStatus { request_sender: Sender, waiter: OptionOutputTx, engine_config: Arc, + cache_manager: CacheManagerRef, ) -> CompactionRequest { let current_version = self.version_control.current().version; let start_time = Instant::now(); @@ -325,6 +338,7 @@ impl CompactionStatus { file_purger: self.file_purger.clone(), start_time, sst_write_buffer_size: engine_config.sst_write_buffer_size, + cache_manager, }; if let Some(pending) = self.pending_compaction.take() { diff --git a/src/mito2/src/compaction/output.rs b/src/mito2/src/compaction/output.rs deleted file mode 100644 index 6111e95c40e7..000000000000 --- a/src/mito2/src/compaction/output.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::readable_size::ReadableSize; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; - -use crate::access_layer::AccessLayerRef; -use crate::error; -use crate::read::projection::ProjectionMapper; -use crate::read::seq_scan::SeqScan; -use crate::read::{BoxedBatchReader, Source}; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; -use crate::sst::parquet::{SstInfo, WriteOptions}; - -#[derive(Debug)] -pub(crate) struct CompactionOutput { - pub output_file_id: FileId, - /// Compaction output file level. - pub output_level: Level, - /// Compaction input files. - pub inputs: Vec, -} - -impl CompactionOutput { - pub(crate) async fn build( - &self, - region_id: RegionId, - schema: RegionMetadataRef, - sst_layer: AccessLayerRef, - sst_write_buffer_size: ReadableSize, - ) -> error::Result> { - let reader = build_sst_reader(schema.clone(), sst_layer.clone(), &self.inputs).await?; - - let opts = WriteOptions { - write_buffer_size: sst_write_buffer_size, - ..Default::default() - }; - - // TODO(hl): measure merge elapsed time. - - let mut writer = sst_layer.write_sst(self.output_file_id, schema, Source::Reader(reader)); - let meta = writer.write_all(&opts).await?.map( - |SstInfo { - time_range, - file_size, - .. - }| { - FileMeta { - region_id, - file_id: self.output_file_id, - time_range, - level: self.output_level, - file_size, - } - }, - ); - - Ok(meta) - } -} - -/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. -async fn build_sst_reader( - schema: RegionMetadataRef, - sst_layer: AccessLayerRef, - inputs: &[FileHandle], -) -> error::Result { - SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?) - .with_files(inputs.to_vec()) - // We ignore file not found error during compaction. - .with_ignore_file_not_found(true) - .build_reader() - .await -} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 6b853cc98313..9cf45cdf9089 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -27,18 +27,21 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use tokio::sync::mpsc; -use crate::access_layer::AccessLayerRef; -use crate::compaction::output::CompactionOutput; +use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::cache::CacheManagerRef; use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; -use crate::error; -use crate::error::CompactRegionSnafu; +use crate::error::{self, CompactRegionSnafu}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; +use crate::read::projection::ProjectionMapper; +use crate::read::seq_scan::SeqScan; +use crate::read::{BoxedBatchReader, Source}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; -use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; use crate::sst::file_purger::FilePurgerRef; +use crate::sst::parquet::WriteOptions; use crate::sst::version::LevelMeta; const MAX_PARALLEL_COMPACTION: usize = 8; @@ -126,6 +129,7 @@ impl Picker for TwcsPicker { file_purger, start_time, sst_write_buffer_size, + cache_manager, } = req; let region_metadata = current_version.metadata.clone(); @@ -169,7 +173,7 @@ impl Picker for TwcsPicker { } let task = TwcsCompactionTask { region_id, - schema: region_metadata, + metadata: region_metadata, sst_layer: access_layer, outputs, expired_ssts, @@ -179,6 +183,8 @@ impl Picker for TwcsPicker { waiters, file_purger, start_time, + cache_manager, + storage: current_version.options.storage.clone(), }; Some(Box::new(task)) } @@ -228,7 +234,7 @@ fn find_latest_window_in_seconds<'a>( pub(crate) struct TwcsCompactionTask { pub region_id: RegionId, - pub schema: RegionMetadataRef, + pub metadata: RegionMetadataRef, pub sst_layer: AccessLayerRef, pub outputs: Vec, pub expired_ssts: Vec, @@ -241,6 +247,9 @@ pub(crate) struct TwcsCompactionTask { pub waiters: Vec, /// Start time of compaction task pub start_time: Instant, + pub(crate) cache_manager: CacheManagerRef, + /// Target storage of the region. + pub(crate) storage: Option, } impl Debug for TwcsCompactionTask { @@ -274,11 +283,8 @@ impl TwcsCompactionTask { let mut futs = Vec::with_capacity(self.outputs.len()); let mut compacted_inputs = Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); - let region_id = self.region_id; + for output in self.outputs.drain(..) { - let schema = self.schema.clone(); - let sst_layer = self.sst_layer.clone(); - let sst_write_buffer_size = self.sst_write_buffer_size; compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); info!( @@ -293,15 +299,42 @@ impl TwcsCompactionTask { output.output_file_id ); - // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. + let write_opts = WriteOptions { + write_buffer_size: self.sst_write_buffer_size, + ..Default::default() + }; + let metadata = self.metadata.clone(); + let sst_layer = self.sst_layer.clone(); + let region_id = self.region_id; + let cache_manager = self.cache_manager.clone(); + let storage = self.storage.clone(); futs.push(async move { - output - .build(region_id, schema, sst_layer, sst_write_buffer_size) - .await + let reader = + build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?; + let file_meta_opt = sst_layer + .write_sst( + SstWriteRequest { + file_id: output.output_file_id, + metadata, + source: Source::Reader(reader), + cache_manager, + storage, + }, + &write_opts, + ) + .await? + .map(|sst_info| FileMeta { + region_id, + file_id: output.output_file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + }); + Ok(file_meta_opt) }); } - let mut outputs = Vec::with_capacity(futs.len()); + let mut output_files = Vec::with_capacity(futs.len()); while !futs.is_empty() { let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); for _ in 0..MAX_PARALLEL_COMPACTION { @@ -314,11 +347,11 @@ impl TwcsCompactionTask { .context(error::JoinSnafu)? .into_iter() .collect::>>()?; - outputs.extend(metas.into_iter().flatten()); + output_files.extend(metas.into_iter().flatten()); } let inputs = compacted_inputs.into_iter().collect(); - Ok((outputs, inputs)) + Ok((output_files, inputs)) } async fn handle_compaction(&mut self) -> error::Result<(Vec, Vec)> { @@ -485,6 +518,29 @@ fn get_expired_ssts( .collect() } +#[derive(Debug)] +pub(crate) struct CompactionOutput { + pub output_file_id: FileId, + /// Compaction output file level. + pub output_level: Level, + /// Compaction input files. + pub inputs: Vec, +} + +/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. +async fn build_sst_reader( + metadata: RegionMetadataRef, + sst_layer: AccessLayerRef, + inputs: &[FileHandle], +) -> error::Result { + SeqScan::new(sst_layer, ProjectionMapper::all(&metadata)?) + .with_files(inputs.to_vec()) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true) + .build_reader() + .await +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 99b880de7cf2..49c68e489fa3 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -24,7 +24,8 @@ use store_api::storage::RegionId; use strum::IntoStaticStr; use tokio::sync::mpsc; -use crate::access_layer::AccessLayerRef; +use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, @@ -200,6 +201,7 @@ pub(crate) struct RegionFlushTask { pub(crate) listener: WorkerListener, pub(crate) engine_config: Arc, pub(crate) row_group_size: Option, + pub(crate) cache_manager: CacheManagerRef, } impl RegionFlushTask { @@ -243,6 +245,7 @@ impl RegionFlushTask { async fn do_flush(&mut self, version_data: VersionControlData) { let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer(); self.listener.on_flush_begin(self.region_id).await; + let worker_request = match self.flush_memtables(&version_data.version).await { Ok(file_metas) => { let memtables_to_remove = version_data @@ -252,6 +255,7 @@ impl RegionFlushTask { .iter() .map(|m| m.id()) .collect(); + let flush_finished = FlushFinished { region_id: self.region_id, file_metas, @@ -297,10 +301,10 @@ impl RegionFlushTask { if let Some(row_group_size) = self.row_group_size { write_opts.row_group_size = row_group_size; } + let memtables = version.memtables.immutables(); let mut file_metas = Vec::with_capacity(memtables.len()); let mut flushed_bytes = 0; - for mem in memtables { if mem.is_empty() { // Skip empty memtables. @@ -310,22 +314,32 @@ impl RegionFlushTask { let file_id = FileId::random(); let iter = mem.iter(None, None); let source = Source::Iter(iter); - let mut writer = self + + // Flush to level 0. + let write_request = SstWriteRequest { + file_id, + metadata: version.metadata.clone(), + source, + cache_manager: self.cache_manager.clone(), + storage: version.options.storage.clone(), + }; + let Some(sst_info) = self .access_layer - .write_sst(file_id, version.metadata.clone(), source); - let Some(sst_info) = writer.write_all(&write_opts).await? else { + .write_sst(write_request, &write_opts) + .await? + else { // No data written. continue; }; - flushed_bytes += sst_info.file_size; - file_metas.push(FileMeta { - region_id: version.metadata.region_id, + let file_meta = FileMeta { + region_id: self.region_id, file_id, time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, - }); + }; + file_metas.push(file_meta); } if !file_metas.is_empty() { @@ -334,8 +348,8 @@ impl RegionFlushTask { let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); info!( - "Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}", - version.metadata.region_id, + "Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}s", + self.region_id, self.reason.as_str(), file_ids, timer.stop_and_record(), @@ -652,6 +666,7 @@ mod tests { use tokio::sync::oneshot; use super::*; + use crate::cache::CacheManager; use crate::test_util::scheduler_util::SchedulerEnv; use crate::test_util::version_util::VersionControlBuilder; @@ -728,6 +743,7 @@ mod tests { listener: WorkerListener::default(), engine_config: Arc::new(MitoConfig::default()), row_group_size: None, + cache_manager: Arc::new(CacheManager::new(0, 0, 0)), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index c4415887f386..8fb640399ea7 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -37,7 +37,7 @@ pub struct RegionOptions { pub ttl: Option, /// Compaction options. pub compaction: CompactionOptions, - /// Custom storage. + /// Custom storage. Uses default storage if it is `None`. pub storage: Option, /// Wal options. pub wal_options: WalOptions, diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 7d27e49eaf73..6cf8043174a1 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -27,8 +27,6 @@ use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; use crate::wal::{EntryId, WalWriter}; -/// Context to keep region metadata and buffer write requests. - /// Notifier to notify write result on drop. struct WriteNotify { /// Error to send to the waiter. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 584faf1ab964..20259672e3bb 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -114,8 +114,12 @@ mod tests { ..Default::default() }; - let mut writer = ParquetWriter::new(file_path, metadata, source, object_store.clone()); - let info = writer.write_all(&write_opts).await.unwrap().unwrap(); + let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone()); + let info = writer + .write_all(source, &write_opts) + .await + .unwrap() + .unwrap(); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); assert_eq!( @@ -159,9 +163,12 @@ mod tests { ..Default::default() }; // Prepare data. - let mut writer = - ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); - writer.write_all(&write_opts).await.unwrap().unwrap(); + let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + writer + .write_all(source, &write_opts) + .await + .unwrap() + .unwrap(); let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024))); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) @@ -220,10 +227,9 @@ mod tests { // write the sst file and get sst info // sst info contains the parquet metadata, which is converted from FileMetaData - let mut writer = - ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); let sst_info = writer - .write_all(&write_opts) + .write_all(source, &write_opts) .await .unwrap() .expect("write_all should return sst info"); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index febec27c0d36..5d8392b6d58d 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -36,8 +36,6 @@ use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; pub struct ParquetWriter { /// SST output file path. file_path: String, - /// Input data source. - source: Source, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, object_store: ObjectStore, @@ -48,12 +46,10 @@ impl ParquetWriter { pub fn new( file_path: String, metadata: RegionMetadataRef, - source: Source, object_store: ObjectStore, ) -> ParquetWriter { ParquetWriter { file_path, - source, metadata, object_store, } @@ -62,7 +58,11 @@ impl ParquetWriter { /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. - pub async fn write_all(&mut self, opts: &WriteOptions) -> Result> { + pub async fn write_all( + &mut self, + mut source: Source, + opts: &WriteOptions, + ) -> Result> { let json = self.metadata.to_json().context(InvalidMetadataSnafu)?; let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); @@ -88,7 +88,7 @@ impl ParquetWriter { .context(WriteBufferSnafu)?; let mut stats = SourceStats::default(); - while let Some(batch) = self.source.next_batch().await? { + while let Some(batch) = source.next_batch().await? { stats.update(&batch); let arrow_batch = write_format.convert_batch(&batch)?; diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 611371a7ec0b..3cf69c8456ca 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -22,6 +22,7 @@ use object_store::ObjectStore; use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayer, AccessLayerRef}; +use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; use crate::flush::FlushScheduler; use crate::request::WorkerRequest; @@ -65,7 +66,11 @@ impl SchedulerEnv { ) -> CompactionScheduler { let scheduler = self.get_scheduler(); - CompactionScheduler::new(scheduler, request_sender) + CompactionScheduler::new( + scheduler, + request_sender, + Arc::new(CacheManager::new(0, 0, 0)), + ) } /// Creates a new flush scheduler. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c969fdd008c1..40bb14a401fd 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -286,7 +286,11 @@ impl WorkerStarter { scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), - compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()), + compaction_scheduler: CompactionScheduler::new( + self.scheduler, + sender.clone(), + self.cache_manager.clone(), + ), stalled_requests: StalledRequests::default(), listener: self.listener, cache_manager: self.cache_manager, diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 043cb60cc9d3..c913a7b67612 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -137,7 +137,6 @@ impl RegionWorkerLoop { row_group_size: Option, engine_config: Arc, ) -> RegionFlushTask { - // TODO(yingwen): metrics for flush requested. RegionFlushTask { region_id: region.region_id, reason, @@ -149,6 +148,7 @@ impl RegionWorkerLoop { listener: self.listener.clone(), engine_config, row_group_size, + cache_manager: self.cache_manager.clone(), } } }