Skip to content

Commit

Permalink
feat: write and upload sst file
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Jan 5, 2024
1 parent 96b6235 commit 8702308
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ pub(crate) struct IndexValue {
file_size: u32,
}

impl IndexValue {
pub fn new(file_size: u32) -> IndexValue {
IndexValue { file_size }
}

pub fn file_size(&self) -> u32 {
self.file_size
}
}

/// Generates the path to the cached file.
///
/// The file name format is `{region_id}.{file_id}`
Expand Down
69 changes: 61 additions & 8 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
use std::sync::Arc;

use api::v1::region;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{UPLOAD_BYTES_TOTAL, WRITE_AND_UPLOAD_ELAPSED_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::parquet::{SstInfo, WriteOptions, DEFAULT_WRITE_BUFFER_SIZE};

/// A cache for uploading files to remote object stores.
///
Expand Down Expand Up @@ -65,11 +69,60 @@ impl WriteCache {
request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
writer.write_all(request.source, write_opts).await
let _timer = WRITE_AND_UPLOAD_ELAPSED_TOTAL.start_timer();

let region_id = request.metadata.region_id;
let file_id = request.file_id;

let cache_path = self.file_cache.cache_file_path((region_id, file_id));
// Write to FileCache.
let mut writer = ParquetWriter::new(
cache_path.clone(),
request.metadata,
self.file_cache.local_store(),
);

let sst_info = writer.write_all(request.source, write_opts).await?;

// Upload sst file to remote object store.
let upload_path = request.upload_path.clone();
let reader = self
.file_cache
.local_store()
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?;

let mut writer = request
.remote_store
.writer_with(&upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?;

let n = futures::io::copy(reader, &mut writer)
.await
.context(error::UploadSstSnafu { region_id, file_id })?;

UPLOAD_BYTES_TOTAL.inc_by(n);

// Must close to upload all data.
writer.close().await.context(error::OpenDalSnafu)?;

info!(
"Upload file to remote, file: {}, upload_path: {}",
file_id, upload_path
);

// Register to file cache
if let Some(sst_info) = &sst_info {
let file_size = sst_info.file_size as u32;
self.file_cache
.put((region_id, file_id), IndexValue::new(file_size))
.await;
}

Ok(sst_info)
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,19 @@ pub enum Error {
blob_type: String,
location: Location,
},

#[snafu(display(
"Failed to upload sst file, region_id: {}, file_id: {}",
region_id,
file_id
))]
UploadSst {
region_id: RegionId,
file_id: FileId,
#[snafu(source)]
error: std::io::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -555,6 +568,7 @@ impl ErrorExt for Error {
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
UploadSst { .. } => StatusCode::StorageUnavailable,
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
"mito upload bytes total",
)
.unwrap();
/// Timer of upload.
pub static ref WRITE_AND_UPLOAD_ELAPSED_TOTAL: Histogram = register_histogram!(
"mito_write_and_upload_elapsed_total",
"mito write and upload elapsed total",
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::sst::file::FileTimeRange;

/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
Expand Down

0 comments on commit 8702308

Please sign in to comment.