Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lookup manifest file size #2590

Merged
merged 11 commits into from
Oct 23, 2023
122 changes: 115 additions & 7 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ impl RegionManifestManager {
let inner = self.inner.read().await;
inner.store.clone()
}

/// Returns total manifest size.
pub async fn manifest_size(&self) -> u64 {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -186,7 +192,7 @@ impl RegionManifestManagerInner {
/// Creates a new manifest.
async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
Expand Down Expand Up @@ -232,16 +238,17 @@ impl RegionManifestManagerInner {
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
);

// recover from storage
// construct manifest builder
// calculate the manifest size from the latest checkpoint
let mut version = MIN_VERSION;
let checkpoint = Self::last_checkpoint(&store).await?;
let checkpoint = Self::last_checkpoint(&mut store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
Expand All @@ -265,6 +272,8 @@ impl RegionManifestManagerInner {
let mut action_iter = store.scan(version, MAX_VERSION).await?;
while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
// set manifest size after last checkpoint
store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
Expand Down Expand Up @@ -312,6 +321,7 @@ impl RegionManifestManagerInner {
Ok(())
}

/// Update the manifest. Return the current manifest version number.
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
Expand Down Expand Up @@ -343,6 +353,11 @@ impl RegionManifestManagerInner {

Ok(version)
}

/// Returns total manifest size.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.store.total_manifest_size()
}
}

impl RegionManifestManagerInner {
Expand All @@ -369,8 +384,8 @@ impl RegionManifestManagerInner {
}

/// Make a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&self.store).await?;
async fn do_checkpoint(&mut self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&mut self.store).await?;
let current_version = self.last_version;

let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint {
Expand Down Expand Up @@ -441,7 +456,7 @@ impl RegionManifestManagerInner {

/// Fetch the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &ManifestObjectStore,
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = store.load_last_checkpoint().await?;

Expand All @@ -456,14 +471,16 @@ impl RegionManifestManagerInner {

#[cfg(test)]
mod test {

use api::v1::SemanticType;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};

use super::*;
use crate::manifest::action::RegionChange;
use crate::manifest::action::{RegionChange, RegionEdit};
use crate::manifest::tests::utils::basic_region_metadata;
use crate::test_util::TestEnv;

Expand Down Expand Up @@ -546,4 +563,95 @@ mod test {
.unwrap();
manager.validate_manifest(&new_metadata, 1).await;
}

/// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
async fn manifest_dir_usage(path: &str) -> u64 {
let mut size = 0;
let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
while let Ok(dir_entry) = read_dir.next_entry().await {
let Some(entry) = dir_entry else {
break;
};
if entry.file_type().await.unwrap().is_file() {
let file_name = entry.file_name().into_string().unwrap();
if file_name.contains(".checkpoint") || file_name.contains(".json") {
let file_size = entry.metadata().await.unwrap().len() as usize;
debug!("File: {file_name:?}, size: {file_size}");
size += file_size;
}
}
}
size as u64
}

#[tokio::test]
async fn test_manifest_size() {
let metadata = Arc::new(basic_region_metadata());
let data_home = create_temp_dir("");
let data_home_path = data_home.path().to_str().unwrap().to_string();
let env = TestEnv::with_data_home(data_home);

let manifest_dir = format!("{}/manifest", data_home_path);

let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();

let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = Arc::new(new_metadata_builder.build().unwrap());

let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
}));

let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;

// get manifest size
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);

// update 10 times nop_action to trigger checkpoint
for _ in 0..10 {
manager
.update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
)]))
.await
.unwrap();
}

// check manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);

// Reopen the manager,
// we just calculate the size from the latest checkpoint file
manager.stop().await.unwrap();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 11).await;

// get manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, 1312);
}
}
Loading
Loading