Skip to content

Commit

Permalink
feat(mito): support write cache for index file
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Jan 11, 2024
1 parent fd8fb64 commit 782724b
Show file tree
Hide file tree
Showing 17 changed files with 302 additions and 92 deletions.
13 changes: 10 additions & 3 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl AccessLayer {
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
Expand All @@ -114,15 +115,21 @@ impl AccessLayer {
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
},
write_opts,
)
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
let mut writer = ParquetWriter::new(
file_path,
index_file_path,
request.metadata,
self.object_store.clone(),
);
writer.write_all(request.source, write_opts).await?
};

Expand Down
117 changes: 92 additions & 25 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl FileCache {
// The cache is replaced by another file. This is unexpected, we don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0);
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}

Expand All @@ -77,7 +77,7 @@ impl FileCache {
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
}
Err(e) => {
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0);
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
}
}
}
Expand Down Expand Up @@ -205,7 +205,51 @@ impl FileCache {
}

/// Key of file cache index.
pub(crate) type IndexKey = (RegionId, FileId);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct IndexKey {
pub region_id: RegionId,
pub file_id: FileId,
pub file_type: FileType,
}

impl IndexKey {
/// Creates a new index key.
pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
IndexKey {
region_id,
file_id,
file_type,
}
}
}

/// Type of the file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
}

impl FileType {
/// Parses the file type from string.
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}

/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
}
}
}

/// An entity that describes the file in the file cache.
///
Expand All @@ -220,19 +264,28 @@ pub(crate) struct IndexValue {
///
/// The file name format is `{region_id}.{file_id}`
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1))
join_path(
cache_file_dir,
&format!(
"{}.{}.{}",
key.region_id.as_u64(),
key.file_id,
key.file_type.as_str()
),
)
}

/// Parse index key from the file name.
fn parse_index_key(name: &str) -> Option<IndexKey> {
let mut splited = name.splitn(2, '.');
let region_id = splited.next().and_then(|s| {
let mut split = name.splitn(3, '.');
let region_id = split.next().and_then(|s| {
let id = s.parse::<u64>().ok()?;
Some(RegionId::from_u64(id))
})?;
let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_type = split.next().and_then(FileType::parse)?;

Some((region_id, file_id))
Some(IndexKey::new(region_id, file_id, file_type))
}

#[cfg(test)]
Expand All @@ -257,7 +310,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);

// Get an empty file.
Expand All @@ -270,7 +323,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;

// Read file content.
Expand Down Expand Up @@ -303,7 +359,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);

// Write a file.
Expand All @@ -313,7 +369,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;

// Remove the file but keep the index.
Expand All @@ -332,19 +391,20 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));

let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
// Write N files.
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
let mut total_size = 0;
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let file_path = cache.cache_file_path(key);
let bytes = i.to_string().into_bytes();
local_store.write(&file_path, bytes.clone()).await.unwrap();

// Add to the cache.
cache
.put(
(region_id, *file_id),
IndexKey::new(region_id, *file_id, file_type),
IndexValue {
file_size: bytes.len() as u32,
},
Expand All @@ -356,15 +416,18 @@ mod tests {
// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
.await
.is_none());
cache.recover().await.unwrap();

// Check size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);

for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
Expand All @@ -376,12 +439,18 @@ mod tests {
fn test_cache_file_path() {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
cache_file_path(
"test_dir",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
cache_file_path(
"test_dir/",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
}

Expand All @@ -390,8 +459,8 @@ mod tests {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
let region_id = RegionId::new(1234, 5);
assert_eq!(
(region_id, file_id),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap()
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
Expand All @@ -400,8 +469,6 @@ mod tests {
assert!(parse_index_key(".5299989643269").is_none());
assert!(parse_index_key("5299989643269.").is_none());
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
assert!(
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none()
);
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
}
}
Loading

0 comments on commit 782724b

Please sign in to comment.