Skip to content

Commit

Permalink
feat: fetch manifests in concurrent (#2951)
Browse files Browse the repository at this point in the history
* feat: fetch manifests in concurrent

* chore: set fetching manifest concurrency limit to 16
  • Loading branch information
WenyXu authored Dec 19, 2023
1 parent d180e41 commit 29fc2ea
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 54 deletions.
13 changes: 9 additions & 4 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ impl RegionManifestManagerInner {
};

// apply actions from storage
let mut action_iter = store.scan(version, MAX_VERSION).await?;
while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? {
let manifests = store.scan(version, MAX_VERSION).await?;
let manifests = store.fetch_manifests(&manifests).await?;

for (manifest_version, raw_action_list) in manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
// set manifest size after last checkpoint
store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
Expand Down Expand Up @@ -402,10 +404,13 @@ impl RegionManifestManagerInner {
return Ok(None);
}

let mut iter = self.store.scan(start_version, end_version).await?;
let manifests = self.store.scan(start_version, end_version).await?;
let mut last_version = start_version;
let mut compacted_actions = 0;
while let Some((version, raw_action_list)) = iter.next_log().await? {

let manifests = self.store.fetch_manifests(&manifests).await?;

for (version, raw_action_list) in manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
Expand Down
117 changes: 67 additions & 50 deletions src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::str::FromStr;

use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use futures::future::try_join_all;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use object_store::{util, Entry, ErrorKind, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::manifest::ManifestVersion;
use tokio::sync::Semaphore;

use crate::error::{
CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
Expand All @@ -41,6 +43,7 @@ const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip
/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
const FETCH_MANIFEST_PARALLELISM: usize = 16;

/// Returns the [CompressionType] according to whether to compress manifest files.
#[inline]
Expand Down Expand Up @@ -101,35 +104,6 @@ pub fn is_checkpoint_file(file_name: &str) -> bool {
CHECKPOINT_RE.is_match(file_name)
}

pub struct ObjectStoreLogIterator {
object_store: ObjectStore,
iter: Box<dyn Iterator<Item = (ManifestVersion, Entry)> + Send + Sync>,
}

impl ObjectStoreLogIterator {
pub async fn next_log(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
match self.iter.next() {
Some((v, entry)) => {
let compress_type = file_compress_type(entry.name());
let bytes = self
.object_store
.read(entry.path())
.await
.context(OpenDalSnafu)?;
let data = compress_type
.decode(bytes)
.await
.context(DecompressObjectSnafu {
compress_type,
path: entry.path(),
})?;
Ok(Some((v, data)))
}
None => Ok(None),
}
}
}

/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
enum FileKey {
Expand Down Expand Up @@ -197,12 +171,12 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}

/// Scan the manifest files in the range of [start, end) and return the iterator.
/// Scan the manifest files in the range of [start, end) and return all manifest entries.
pub async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<ObjectStoreLogIterator> {
) -> Result<Vec<(ManifestVersion, Entry)>> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });

let mut entries: Vec<(ManifestVersion, Entry)> = self
Expand All @@ -220,10 +194,38 @@ impl ManifestObjectStore {

entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));

Ok(ObjectStoreLogIterator {
object_store: self.object_store.clone(),
iter: Box::new(entries.into_iter()),
})
Ok(entries)
}

/// Fetch all manifests in concurrent.
pub async fn fetch_manifests(
&self,
manifests: &[(u64, Entry)],
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
// TODO(weny): Make it configurable.
let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);

let tasks = manifests.iter().map(|(v, entry)| async {
// Safety: semaphore must exist.
let _permit = semaphore.acquire().await.unwrap();

let compress_type = file_compress_type(entry.name());
let bytes = self
.object_store
.read(entry.path())
.await
.context(OpenDalSnafu)?;
let data = compress_type
.decode(bytes)
.await
.context(DecompressObjectSnafu {
compress_type,
path: entry.path(),
})?;
Ok((*v, data))
});

try_join_all(tasks).await
}

/// Delete manifest files that version < end.
Expand Down Expand Up @@ -556,21 +558,25 @@ mod tests {
.unwrap();
}

let mut it = log_store.scan(1, 4).await.unwrap();
let manifests = log_store.scan(1, 4).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();
for v in 1..4 {
let (version, bytes) = it.next_log().await.unwrap().unwrap();
let (version, bytes) = it.next().unwrap();
assert_eq!(v, version);
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
assert!(it.next_log().await.unwrap().is_none());
assert!(it.next().is_none());

let mut it = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();
for v in 0..5 {
let (version, bytes) = it.next_log().await.unwrap().unwrap();
let (version, bytes) = it.next().unwrap();
assert_eq!(v, version);
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
assert!(it.next_log().await.unwrap().is_none());
assert!(it.next().is_none());

// test checkpoint
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
Expand All @@ -587,18 +593,24 @@ mod tests {
let _ = log_store.delete_until(4, true).await.unwrap();
let _ = log_store.load_checkpoint(3).await.unwrap().unwrap();
let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
let mut it = log_store.scan(0, 11).await.unwrap();
let (version, bytes) = it.next_log().await.unwrap().unwrap();
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();

let (version, bytes) = it.next().unwrap();
assert_eq!(4, version);
assert_eq!("hello, 4".as_bytes(), bytes);
assert!(it.next_log().await.unwrap().is_none());
assert!(it.next().is_none());

// delete all logs and checkpoints
let _ = log_store.delete_until(11, false).await.unwrap();
assert!(log_store.load_checkpoint(3).await.unwrap().is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
let mut it = log_store.scan(0, 11).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();

assert!(it.next().is_none());
}

#[tokio::test]
Expand Down Expand Up @@ -640,9 +652,12 @@ mod tests {
.unwrap();

// test data reading
let mut it = log_store.scan(0, 10).await.unwrap();
let manifests = log_store.scan(0, 10).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();

for v in 0..10 {
let (version, bytes) = it.next_log().await.unwrap().unwrap();
let (version, bytes) = it.next().unwrap();
assert_eq!(v, version);
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
Expand All @@ -656,8 +671,10 @@ mod tests {
// Delete util 10, contain uncompressed/compressed data
// log 0, 1, 2, 7, 8, 9 will be delete
assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
let mut it = log_store.scan(0, 10).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
let manifests = log_store.scan(0, 10).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let mut it = manifests.into_iter();
assert!(it.next().is_none());
}

#[tokio::test]
Expand Down

0 comments on commit 29fc2ea

Please sign in to comment.