Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write and upload sst #3106

Merged
merged 9 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ impl FileCache {
Ok(None)
}
}

/// Checks if the key is in the file cache.
#[cfg(test)]
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index.contains_key(key)
}
}

/// Key of file cache index.
Expand All @@ -207,7 +213,7 @@ pub(crate) type IndexKey = (RegionId, FileId);
#[derive(Debug, Clone)]
pub(crate) struct IndexValue {
/// Size of the file in bytes.
file_size: u32,
pub(crate) file_size: u32,
}

/// Generates the path to the cached file.
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/cache/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::services::Fs;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -42,3 +44,9 @@ fn parquet_file_data() -> Vec<u8> {

buffer
}

pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
ObjectStore::new(builder).unwrap().finish()
}
170 changes: 162 additions & 8 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@

use std::sync::Arc;

use api::v1::region;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::{debug, info};
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::access_layer::new_fs_object_store;
use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// A cache for uploading files to remote object stores.
///
Expand Down Expand Up @@ -77,11 +81,76 @@ impl WriteCache {
request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
writer.write_all(request.source, write_opts).await
let timer = FLUSH_ELAPSED
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
.with_label_values(&["write_sst"])
.start_timer();

let region_id = request.metadata.region_id;
let file_id = request.file_id;

let cache_path = self.file_cache.cache_file_path((region_id, file_id));
// Write to FileCache.
let mut writer = ParquetWriter::new(
cache_path.clone(),
request.metadata,
self.file_cache.local_store(),
);

let sst_info = writer.write_all(request.source, write_opts).await?;

timer.stop_and_record();

// Upload sst file to remote object store.
QuenKar marked this conversation as resolved.
Show resolved Hide resolved
if sst_info.is_none() {
QuenKar marked this conversation as resolved.
Show resolved Hide resolved
// No data need to upload.
return Ok(None);
}

let timer = FLUSH_ELAPSED
.with_label_values(&["upload_sst"])
.start_timer();

let reader = self
.file_cache
.local_store()
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?;

let upload_path = request.upload_path;
let mut writer = request
.remote_store
.writer_with(&upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?;

let bytes_written = futures::io::copy(reader, &mut writer)
.await
.context(error::UploadSstSnafu { region_id, file_id })?;

// Must close to upload all data.
writer.close().await.context(error::OpenDalSnafu)?;

UPLOAD_BYTES_TOTAL.inc_by(bytes_written);

debug!(
"Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
region_id,
file_id,
upload_path,
timer.stop_and_record()
);

// Register to file cache
if let Some(sst_info) = &sst_info {
let file_size = sst_info.file_size as u32;
self.file_cache
.put((region_id, file_id), IndexValue { file_size })
.await;
}

Ok(sst_info)
}
}

Expand All @@ -96,3 +165,88 @@ pub struct SstUploadRequest {
/// Remote object store to upload.
pub remote_store: ObjectStore,
}

#[cfg(test)]
mod tests {
use api::v1::OpType;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use object_store::manager::ObjectStoreManager;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::storage::RegionId;

use super::*;
use crate::cache::file_cache::{self, FileCache};
use crate::cache::test_util::new_fs_store;
use crate::sst::file::FileId;
use crate::sst::location::sst_file_path;
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);

// Create WriteCache
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
let object_store_manager = env.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(
local_store.clone(),
object_store_manager,
ReadableSize::mb(10),
)
.await
.unwrap();

// Create Source
let metadata = Arc::new(sst_region_metadata());
let region_id = metadata.region_id;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);

let request = SstUploadRequest {
file_id,
metadata,
source,
storage: None,
upload_path: upload_path.clone(),
remote_store: mock_store.clone(),
};

let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};

// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(request, &write_opts)
.await
.unwrap()
.unwrap();

// Check write cache contains the key
let key = (region_id, file_id);
assert!(write_cache.file_cache.contains_key(&key));

// Check file data
let remote_data = mock_store.read(&upload_path).await.unwrap();
let cache_data = local_store
.read(&write_cache.file_cache.cache_file_path(key))
.await
.unwrap();
assert_eq!(remote_data, cache_data);
}
}
14 changes: 14 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,19 @@ pub enum Error {
flushed_entry_id: u64,
unexpected_entry_id: u64,
},

#[snafu(display(
"Failed to upload sst file, region_id: {}, file_id: {}",
region_id,
file_id
))]
UploadSst {
region_id: RegionId,
file_id: FileId,
#[snafu(source)]
error: std::io::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -608,6 +621,7 @@ impl ErrorExt for Error {
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
UploadSst { .. } => StatusCode::StorageUnavailable,
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
"mito upload bytes total",
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

//! Sorted strings tables.

use common_base::readable_size::ReadableSize;

pub mod file;
pub mod file_purger;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;

/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
21 changes: 4 additions & 17 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use super::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;

/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);

/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
Expand Down Expand Up @@ -71,33 +72,19 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use api::v1::OpType;
use common_time::Timestamp;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::read::Batch;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
new_primary_key, new_source, sst_file_handle, sst_region_metadata,
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, new_batch_builder, TestEnv};
use crate::test_util::{check_reader_result, TestEnv};

const FILE_DIR: &str = "/";

fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
Expand Down
16 changes: 14 additions & 2 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Utilities for testing SSTs.

use api::v1::SemanticType;
use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand All @@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::read::{Batch, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_noop_file_purger, VecBatchReader};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -110,3 +110,15 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
file_purger,
)
}

pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}
Loading