Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): support write cache for index file #3144

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl AccessLayer {
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
Expand All @@ -114,15 +115,17 @@ impl AccessLayer {
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
},
write_opts,
)
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await?
};

Expand Down
128 changes: 100 additions & 28 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl FileCache {
// The cache is replaced by another file. This is unexpected, we don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0);
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}

Expand All @@ -80,7 +80,7 @@ impl FileCache {
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
}
Err(e) => {
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0);
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
}
}
}
Expand Down Expand Up @@ -241,7 +241,51 @@ impl FileCache {
}

/// Key of file cache index.
pub(crate) type IndexKey = (RegionId, FileId);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct IndexKey {
pub region_id: RegionId,
pub file_id: FileId,
pub file_type: FileType,
}

impl IndexKey {
/// Creates a new index key.
pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
IndexKey {
region_id,
file_id,
file_type,
}
}
}

/// Type of the file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
}

impl FileType {
/// Parses the file type from string.
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}

/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
}
}
}

/// An entity that describes the file in the file cache.
///
Expand All @@ -254,21 +298,30 @@ pub(crate) struct IndexValue {

/// Generates the path to the cached file.
///
/// The file name format is `{region_id}.{file_id}`
/// The file name format is `{region_id}.{file_id}.{file_type}`
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1))
join_path(
cache_file_dir,
&format!(
"{}.{}.{}",
key.region_id.as_u64(),
key.file_id,
key.file_type.as_str()
),
)
}

/// Parse index key from the file name.
fn parse_index_key(name: &str) -> Option<IndexKey> {
let mut splited = name.splitn(2, '.');
let region_id = splited.next().and_then(|s| {
let mut split = name.splitn(3, '.');
let region_id = split.next().and_then(|s| {
let id = s.parse::<u64>().ok()?;
Some(RegionId::from_u64(id))
})?;
let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_type = split.next().and_then(FileType::parse)?;

Some((region_id, file_id))
Some(IndexKey::new(region_id, file_id, file_type))
}

#[cfg(test)]
Expand All @@ -293,7 +346,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);

// Get an empty file.
Expand All @@ -306,7 +359,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;

// Read file content.
Expand Down Expand Up @@ -339,7 +395,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);

// Write a file.
Expand All @@ -349,7 +405,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;

// Remove the file but keep the index.
Expand All @@ -368,19 +427,20 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));

let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
// Write N files.
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
let mut total_size = 0;
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let file_path = cache.cache_file_path(key);
let bytes = i.to_string().into_bytes();
local_store.write(&file_path, bytes.clone()).await.unwrap();

// Add to the cache.
cache
.put(
(region_id, *file_id),
IndexKey::new(region_id, *file_id, file_type),
IndexValue {
file_size: bytes.len() as u32,
},
Expand All @@ -392,15 +452,18 @@ mod tests {
// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
.await
.is_none());
cache.recover().await.unwrap();

// Check size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);

for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
Expand All @@ -415,7 +478,7 @@ mod tests {
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = file_cache.cache_file_path(key);
// Write a file.
let data = b"hello greptime database";
Expand All @@ -424,9 +487,7 @@ mod tests {
.await
.unwrap();
// Add to the cache.
file_cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.await;
file_cache.put(key, IndexValue { file_size: 5 }).await;
// Ranges
let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
Expand All @@ -442,12 +503,18 @@ mod tests {
fn test_cache_file_path() {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
cache_file_path(
"test_dir",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
cache_file_path(
"test_dir/",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
}

Expand All @@ -456,8 +523,8 @@ mod tests {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
let region_id = RegionId::new(1234, 5);
assert_eq!(
(region_id, file_id),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap()
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
Expand All @@ -466,8 +533,13 @@ mod tests {
assert!(parse_index_key(".5299989643269").is_none());
assert!(parse_index_key("5299989643269.").is_none());
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
assert!(
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none()
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
);
assert!(parse_index_key(
"5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin"
)
.is_none());
}
}
Loading
Loading