diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 3fd3408edd89..a38ee33d97da 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -197,6 +197,16 @@ pub(crate) struct IndexValue { file_size: u32, } +impl IndexValue { + pub fn new(file_size: u32) -> IndexValue { + IndexValue { file_size } + } + + pub fn file_size(&self) -> u32 { + self.file_size + } +} + /// Generates the path to the cached file. /// /// The file name format is `{region_id}.{file_id}` diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index b640ba896666..dcc13ef2aa31 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -16,17 +16,21 @@ use std::sync::Arc; +use api::v1::region; use common_base::readable_size::ReadableSize; +use common_telemetry::info; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use crate::cache::file_cache::{FileCache, FileCacheRef}; -use crate::error::Result; +use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue}; +use crate::error::{self, Result}; +use crate::metrics::{UPLOAD_BYTES_TOTAL, WRITE_AND_UPLOAD_ELAPSED_TOTAL}; use crate::read::Source; use crate::sst::file::FileId; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::parquet::{SstInfo, WriteOptions, DEFAULT_WRITE_BUFFER_SIZE}; /// A cache for uploading files to remote object stores. /// @@ -65,11 +69,60 @@ impl WriteCache { request: SstUploadRequest, write_opts: &WriteOptions, ) -> Result> { - // TODO(yingwen): Write to the local store and then upload. - // Now we write to the remote and ignore local cache. - let mut writer = - ParquetWriter::new(request.upload_path, request.metadata, request.remote_store); - writer.write_all(request.source, write_opts).await + let _timer = WRITE_AND_UPLOAD_ELAPSED_TOTAL.start_timer(); + + let region_id = request.metadata.region_id; + let file_id = request.file_id; + + let cache_path = self.file_cache.cache_file_path((region_id, file_id)); + // Write to FileCache. + let mut writer = ParquetWriter::new( + cache_path.clone(), + request.metadata, + self.file_cache.local_store(), + ); + + let sst_info = writer.write_all(request.source, write_opts).await?; + + // Upload sst file to remote object store. + let upload_path = request.upload_path.clone(); + let reader = self + .file_cache + .local_store() + .reader(&cache_path) + .await + .context(error::OpenDalSnafu)?; + + let mut writer = request + .remote_store + .writer_with(&upload_path) + .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .await + .context(error::OpenDalSnafu)?; + + let n = futures::io::copy(reader, &mut writer) + .await + .context(error::UploadSstSnafu { region_id, file_id })?; + + UPLOAD_BYTES_TOTAL.inc_by(n); + + // Must close to upload all data. + writer.close().await.context(error::OpenDalSnafu)?; + + info!( + "Upload file to remote, file: {}, upload_path: {}", + file_id, upload_path + ); + + // Register to file cache + if let Some(sst_info) = &sst_info { + let file_size = sst_info.file_size as u32; + self.file_cache + .put((region_id, file_id), IndexValue::new(file_size)) + .await; + } + + Ok(sst_info) } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 044a4be5848d..a40ee7116ddd 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -467,6 +467,19 @@ pub enum Error { blob_type: String, location: Location, }, + + #[snafu(display( + "Failed to upload sst file, region_id: {}, file_id: {}", + region_id, + file_id + ))] + UploadSst { + region_id: RegionId, + file_id: FileId, + #[snafu(source)] + error: std::io::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -555,6 +568,7 @@ impl ErrorExt for Error { PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => { source.status_code() } + UploadSst { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index aa407bfdbced..99ac3ed1b99e 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -145,6 +145,18 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + /// Upload bytes counter. + pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( + "mito_upload_bytes_total", + "mito upload bytes total", + ) + .unwrap(); + /// Timer of upload. + pub static ref WRITE_AND_UPLOAD_ELAPSED_TOTAL: Histogram = register_histogram!( + "mito_write_and_upload_elapsed_total", + "mito write and upload elapsed total", + ) + .unwrap(); // ------- End of cache metrics. // Index metrics. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 20259672e3bb..393299ae12ec 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -29,7 +29,7 @@ use crate::sst::file::FileTimeRange; /// Key of metadata in parquet SST. pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; -const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); +pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); /// Default batch size to read parquet files. pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; /// Default row group size for parquet files.