Skip to content

Commit

Permalink
feat: write and upload sst (GreptimeTeam#3106)
Browse files Browse the repository at this point in the history
* feat: write and upload sst file

* refactor: unit test

* cr comment

* chore: typos

* chore: cr comment

* chore: conflict

* Apply suggestions from code review

Co-authored-by: dennis zhuang <[email protected]>

* chore: fmt

* chore: style

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: dennis zhuang <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent 51a3fbc commit 29a7f30
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 28 deletions.
8 changes: 7 additions & 1 deletion src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ impl FileCache {
Ok(None)
}
}

/// Checks if the key is in the file cache.
#[cfg(test)]
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index.contains_key(key)
}
}

/// Key of file cache index.
Expand All @@ -207,7 +213,7 @@ pub(crate) type IndexKey = (RegionId, FileId);
#[derive(Debug, Clone)]
pub(crate) struct IndexValue {
/// Size of the file in bytes.
file_size: u32,
pub(crate) file_size: u32,
}

/// Generates the path to the cached file.
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/cache/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::services::Fs;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -42,3 +44,9 @@ fn parquet_file_data() -> Vec<u8> {

buffer
}

pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
ObjectStore::new(builder).unwrap().finish()
}
168 changes: 160 additions & 8 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
use std::sync::Arc;

use api::v1::region;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::{debug, info};
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::access_layer::new_fs_object_store;
use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// A cache for uploading files to remote object stores.
///
Expand Down Expand Up @@ -77,11 +81,74 @@ impl WriteCache {
request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
writer.write_all(request.source, write_opts).await
let timer = FLUSH_ELAPSED
.with_label_values(&["write_sst"])
.start_timer();

let region_id = request.metadata.region_id;
let file_id = request.file_id;

let cache_path = self.file_cache.cache_file_path((region_id, file_id));
// Write to FileCache.
let mut writer = ParquetWriter::new(
cache_path.clone(),
request.metadata,
self.file_cache.local_store(),
);

let sst_info = writer.write_all(request.source, write_opts).await?;

timer.stop_and_record();

// Upload sst file to remote object store.
let Some(sst_info) = sst_info else {
// No data need to upload.
return Ok(None);
};

let timer = FLUSH_ELAPSED
.with_label_values(&["upload_sst"])
.start_timer();

let reader = self
.file_cache
.local_store()
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?;

let upload_path = request.upload_path;
let mut writer = request
.remote_store
.writer_with(&upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?;

let bytes_written = futures::io::copy(reader, &mut writer)
.await
.context(error::UploadSstSnafu { region_id, file_id })?;

// Must close to upload all data.
writer.close().await.context(error::OpenDalSnafu)?;

UPLOAD_BYTES_TOTAL.inc_by(bytes_written);

debug!(
"Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
region_id,
file_id,
upload_path,
timer.stop_and_record()
);

// Register to file cache
let file_size = sst_info.file_size as u32;
self.file_cache
.put((region_id, file_id), IndexValue { file_size })
.await;

Ok(Some(sst_info))
}
}

Expand All @@ -96,3 +163,88 @@ pub struct SstUploadRequest {
/// Remote object store to upload.
pub remote_store: ObjectStore,
}

#[cfg(test)]
mod tests {
use api::v1::OpType;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use object_store::manager::ObjectStoreManager;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::storage::RegionId;

use super::*;
use crate::cache::file_cache::{self, FileCache};
use crate::cache::test_util::new_fs_store;
use crate::sst::file::FileId;
use crate::sst::location::sst_file_path;
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);

// Create WriteCache
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
let object_store_manager = env.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(
local_store.clone(),
object_store_manager,
ReadableSize::mb(10),
)
.await
.unwrap();

// Create Source
let metadata = Arc::new(sst_region_metadata());
let region_id = metadata.region_id;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);

let request = SstUploadRequest {
file_id,
metadata,
source,
storage: None,
upload_path: upload_path.clone(),
remote_store: mock_store.clone(),
};

let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};

// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(request, &write_opts)
.await
.unwrap()
.unwrap();

// Check write cache contains the key
let key = (region_id, file_id);
assert!(write_cache.file_cache.contains_key(&key));

// Check file data
let remote_data = mock_store.read(&upload_path).await.unwrap();
let cache_data = local_store
.read(&write_cache.file_cache.cache_file_path(key))
.await
.unwrap();
assert_eq!(remote_data, cache_data);
}
}
14 changes: 14 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,19 @@ pub enum Error {
flushed_entry_id: u64,
unexpected_entry_id: u64,
},

#[snafu(display(
"Failed to upload sst file, region_id: {}, file_id: {}",
region_id,
file_id
))]
UploadSst {
region_id: RegionId,
file_id: FileId,
#[snafu(source)]
error: std::io::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -608,6 +621,7 @@ impl ErrorExt for Error {
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
UploadSst { .. } => StatusCode::StorageUnavailable,
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
"mito upload bytes total",
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

//! Sorted strings tables.
use common_base::readable_size::ReadableSize;

pub mod file;
pub mod file_purger;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;

/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
21 changes: 4 additions & 17 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use super::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;

/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);

/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
Expand Down Expand Up @@ -71,33 +72,19 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use api::v1::OpType;
use common_time::Timestamp;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::read::Batch;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
new_primary_key, new_source, sst_file_handle, sst_region_metadata,
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, new_batch_builder, TestEnv};
use crate::test_util::{check_reader_result, TestEnv};

const FILE_DIR: &str = "/";

fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
Expand Down
16 changes: 14 additions & 2 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Utilities for testing SSTs.
use api::v1::SemanticType;
use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand All @@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::read::{Batch, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_noop_file_purger, VecBatchReader};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -110,3 +110,15 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
file_purger,
)
}

pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

0 comments on commit 29a7f30

Please sign in to comment.