diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index fb0c3ec10944..94a8518dd1c0 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -196,6 +196,12 @@ impl FileCache { Ok(None) } } + + /// Checks if the key is in the file cache. + #[cfg(test)] + pub(crate) fn contains_key(&self, key: &IndexKey) -> bool { + self.memory_index.contains_key(key) + } } /// Key of file cache index. @@ -207,7 +213,7 @@ pub(crate) type IndexKey = (RegionId, FileId); #[derive(Debug, Clone)] pub(crate) struct IndexValue { /// Size of the file in bytes. - file_size: u32, + pub(crate) file_size: u32, } /// Generates the path to the cached file. diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index deb4ba23cd7b..306bb50467e8 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::arrow::array::{ArrayRef, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; +use object_store::services::Fs; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; use parquet::file::metadata::ParquetMetaData; @@ -42,3 +44,9 @@ fn parquet_file_data() -> Vec { buffer } + +pub(crate) fn new_fs_store(path: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(path); + ObjectStore::new(builder).unwrap().finish() +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 9775f3d79160..f0cf58de9140 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -16,19 +16,23 @@ use std::sync::Arc; +use api::v1::region; use common_base::readable_size::ReadableSize; -use common_telemetry::info; +use common_telemetry::{debug, info}; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use crate::access_layer::new_fs_object_store; -use crate::cache::file_cache::{FileCache, FileCacheRef}; -use crate::error::Result; +use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue}; +use crate::error::{self, Result}; +use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::read::Source; use crate::sst::file::FileId; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; /// A cache for uploading files to remote object stores. /// @@ -77,11 +81,74 @@ impl WriteCache { request: SstUploadRequest, write_opts: &WriteOptions, ) -> Result> { - // TODO(yingwen): Write to the local store and then upload. - // Now we write to the remote and ignore local cache. - let mut writer = - ParquetWriter::new(request.upload_path, request.metadata, request.remote_store); - writer.write_all(request.source, write_opts).await + let timer = FLUSH_ELAPSED + .with_label_values(&["write_sst"]) + .start_timer(); + + let region_id = request.metadata.region_id; + let file_id = request.file_id; + + let cache_path = self.file_cache.cache_file_path((region_id, file_id)); + // Write to FileCache. + let mut writer = ParquetWriter::new( + cache_path.clone(), + request.metadata, + self.file_cache.local_store(), + ); + + let sst_info = writer.write_all(request.source, write_opts).await?; + + timer.stop_and_record(); + + // Upload sst file to remote object store. + let Some(sst_info) = sst_info else { + // No data need to upload. + return Ok(None); + }; + + let timer = FLUSH_ELAPSED + .with_label_values(&["upload_sst"]) + .start_timer(); + + let reader = self + .file_cache + .local_store() + .reader(&cache_path) + .await + .context(error::OpenDalSnafu)?; + + let upload_path = request.upload_path; + let mut writer = request + .remote_store + .writer_with(&upload_path) + .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .await + .context(error::OpenDalSnafu)?; + + let bytes_written = futures::io::copy(reader, &mut writer) + .await + .context(error::UploadSstSnafu { region_id, file_id })?; + + // Must close to upload all data. + writer.close().await.context(error::OpenDalSnafu)?; + + UPLOAD_BYTES_TOTAL.inc_by(bytes_written); + + debug!( + "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s", + region_id, + file_id, + upload_path, + timer.stop_and_record() + ); + + // Register to file cache + let file_size = sst_info.file_size as u32; + self.file_cache + .put((region_id, file_id), IndexValue { file_size }) + .await; + + Ok(Some(sst_info)) } } @@ -96,3 +163,88 @@ pub struct SstUploadRequest { /// Remote object store to upload. pub remote_store: ObjectStore, } + +#[cfg(test)] +mod tests { + use api::v1::OpType; + use common_base::readable_size::ReadableSize; + use common_test_util::temp_dir::create_temp_dir; + use object_store::manager::ObjectStoreManager; + use object_store::services::Fs; + use object_store::ObjectStore; + use store_api::storage::RegionId; + + use super::*; + use crate::cache::file_cache::{self, FileCache}; + use crate::cache::test_util::new_fs_store; + use crate::sst::file::FileId; + use crate::sst::location::sst_file_path; + use crate::test_util::sst_util::{ + new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, + }; + use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv}; + + #[tokio::test] + async fn test_write_and_upload_sst() { + // TODO(QuenKar): maybe find a way to create some object server for testing, + // and now just use local file system to mock. + let mut env = TestEnv::new(); + let mock_store = env.init_object_store_manager(); + let file_id = FileId::random(); + let upload_path = sst_file_path("test", file_id); + + // Create WriteCache + let local_dir = create_temp_dir(""); + let local_store = new_fs_store(local_dir.path().to_str().unwrap()); + let object_store_manager = env.get_object_store_manager().unwrap(); + let write_cache = WriteCache::new( + local_store.clone(), + object_store_manager, + ReadableSize::mb(10), + ) + .await + .unwrap(); + + // Create Source + let metadata = Arc::new(sst_region_metadata()); + let region_id = metadata.region_id; + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + + let request = SstUploadRequest { + file_id, + metadata, + source, + storage: None, + upload_path: upload_path.clone(), + remote_store: mock_store.clone(), + }; + + let write_opts = WriteOptions { + row_group_size: 512, + ..Default::default() + }; + + // Write to cache and upload sst to mock remote store + let sst_info = write_cache + .write_and_upload_sst(request, &write_opts) + .await + .unwrap() + .unwrap(); + + // Check write cache contains the key + let key = (region_id, file_id); + assert!(write_cache.file_cache.contains_key(&key)); + + // Check file data + let remote_data = mock_store.read(&upload_path).await.unwrap(); + let cache_data = local_store + .read(&write_cache.file_cache.cache_file_path(key)) + .await + .unwrap(); + assert_eq!(remote_data, cache_data); + } +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 98b5883310b3..2fede9ee7aff 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -512,6 +512,19 @@ pub enum Error { flushed_entry_id: u64, unexpected_entry_id: u64, }, + + #[snafu(display( + "Failed to upload sst file, region_id: {}, file_id: {}", + region_id, + file_id + ))] + UploadSst { + region_id: RegionId, + file_id: FileId, + #[snafu(source)] + error: std::io::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -608,6 +621,7 @@ impl ErrorExt for Error { CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, + UploadSst { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index eb84d6d59599..47cf99910550 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -145,6 +145,12 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + /// Upload bytes counter. + pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( + "mito_upload_bytes_total", + "mito upload bytes total", + ) + .unwrap(); // ------- End of cache metrics. // Index metrics. diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 94e0cb205bc2..bdae9d11aac1 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -14,9 +14,14 @@ //! Sorted strings tables. +use common_base::readable_size::ReadableSize; + pub mod file; pub mod file_purger; pub mod index; pub mod location; pub mod parquet; pub(crate) mod version; + +/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb). +pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index fe328f16b121..e52070e1eb9e 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -27,11 +27,12 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; +use super::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::file::FileTimeRange; /// Key of metadata in parquet SST. pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; -const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); + /// Default batch size to read parquet files. pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; /// Default row group size for parquet files. @@ -71,33 +72,19 @@ pub struct SstInfo { mod tests { use std::sync::Arc; - use api::v1::OpType; use common_time::Timestamp; use super::*; use crate::cache::{CacheManager, PageKey}; - use crate::read::Batch; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::test_util::sst_util::{ - new_primary_key, new_source, sst_file_handle, sst_region_metadata, + new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, }; - use crate::test_util::{check_reader_result, new_batch_builder, TestEnv}; + use crate::test_util::{check_reader_result, TestEnv}; const FILE_DIR: &str = "/"; - fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { - assert!(end > start); - let pk = new_primary_key(tags); - let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); - let sequences = vec![1000; end - start]; - let op_types = vec![OpType::Put; end - start]; - let field: Vec<_> = (start..end).map(|v| v as u64).collect(); - new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) - .build() - .unwrap() - } - #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 3638d119faa1..1cc158ed91b5 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,7 @@ //! Utilities for testing SSTs. -use api::v1::SemanticType; +use api::v1::{OpType, SemanticType}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::read::{Batch, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{new_noop_file_purger, VecBatchReader}; +use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -110,3 +110,15 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { file_purger, ) } + +pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end > start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + let field: Vec<_> = (start..end).map(|v| v as u64).collect(); + new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) + .build() + .unwrap() +}