Skip to content

Commit

Permalink
try block level refill with inheritant info
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Sep 15, 2023
1 parent 461923d commit d363d3a
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ pub mod default {
}

pub fn concurrency() -> usize {
10
100
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ reclaim_rate_limit_mb = 0
[storage.cache_refill]
data_refill_levels = []
timeout_ms = 6000
concurrency = 10
concurrency = 100

[system]
barrier_interval_ms = 1000
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,9 @@ fn gen_version_delta<'a>(
prev_id: old_version.id,
max_committed_epoch: old_version.max_committed_epoch,
trivial_move,
// NOTE: Whether to persist inheritance info is not decided yet.
//
// If not to persist, set inheritance info between val commit and memory commit.
inheritances: compact_task.inheritances.clone(),
..Default::default()
};
Expand Down
20 changes: 17 additions & 3 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use await_tree::InstrumentAwait;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo};
use risingwave_hummock_sdk::{
info_in_release, HummockEpoch, HummockSstableObjectId, LocalSstableInfo,
};
use risingwave_pb::hummock::version_update_payload::Payload;
use risingwave_pb::hummock::Inheritances;
use tokio::spawn;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -407,6 +410,7 @@ impl HummockEventHandler {
.unwrap_or_else(|| self.pinned_version.load().clone());

let mut sst_delta_infos = vec![];
let mut inheritances: HashMap<HummockSstableObjectId, Inheritances> = HashMap::default();
let newly_pinned_version = match version_payload {
Payload::VersionDeltas(version_deltas) => {
let mut version_to_apply = pinned_version.version();
Expand All @@ -416,6 +420,12 @@ impl HummockEventHandler {
sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta);
}
version_to_apply.apply_version_delta(version_delta);
inheritances.extend(
version_delta
.inheritances
.iter()
.map(|(sst_obj_id, infos)| (*sst_obj_id, infos.clone())),
);
}

version_to_apply
Expand All @@ -427,8 +437,12 @@ impl HummockEventHandler {

let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version);

self.refiller
.start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
self.refiller.start_cache_refill(
inheritances,
sst_delta_infos,
pinned_version,
new_pinned_version,
);
}

fn apply_version_update(
Expand Down
91 changes: 69 additions & 22 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, LazyLock};
Expand All @@ -29,11 +29,13 @@ use prometheus::{
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::hummock::Inheritances;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::{HummockResult, SstableStoreRef, TableHolder};
use crate::hummock::{HummockResult, SstableBlockIndex, SstableStoreRef, TableHolder};
use crate::monitor::StoreLocalStatistic;

pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
Expand Down Expand Up @@ -149,11 +151,13 @@ impl CacheRefiller {

pub fn start_cache_refill(
&mut self,
inheritances: HashMap<HummockSstableObjectId, Inheritances>,
deltas: Vec<SstDeltaInfo>,
pinned_version: Arc<PinnedVersion>,
new_pinned_version: PinnedVersion,
) {
let task = CacheRefillTask {
inheritances,
deltas,
context: self.context.clone(),
};
Expand Down Expand Up @@ -209,6 +213,7 @@ struct CacheRefillContext {
}

pub struct CacheRefillTask {
inheritances: HashMap<HummockSstableObjectId, Inheritances>,
deltas: Vec<SstDeltaInfo>,
context: CacheRefillContext,
}
Expand All @@ -220,6 +225,24 @@ impl CacheRefillTask {
.iter()
.map(|delta| {
let context = self.context.clone();

let mut inheritances = HashMap::default();
for info in &delta.insert_sst_infos {
if let Some(data) = self.inheritances.get(&info.object_id) {
let data = data
.inheritances
.iter()
.map(|inheritance| {
(
inheritance.parent_sst_obj_id,
inheritance.parent_sst_blk_idx as usize,
)
})
.collect_vec();
inheritances.insert(info.object_id, data);
}
}

async move {
let holders = match Self::meta_cache_refill(&context, delta).await {
Ok(holders) => holders,
Expand All @@ -228,7 +251,7 @@ impl CacheRefillTask {
return;
}
};
Self::data_cache_refill(&context, delta, holders).await;
Self::data_cache_refill(&context, &inheritances, delta, holders).await;
}
})
.collect_vec();
Expand Down Expand Up @@ -263,6 +286,7 @@ impl CacheRefillTask {

async fn data_cache_refill(
context: &CacheRefillContext,
inheritances: &HashMap<HummockSstableObjectId, Vec<(HummockSstableObjectId, usize)>>,
delta: &SstDeltaInfo,
holders: Vec<TableHolder>,
) {
Expand All @@ -288,34 +312,57 @@ impl CacheRefillTask {
.iter()
.any(|id| filter.contains(id))
{
GLOBAL_CACHE_REFILL_METRICS.data_refill_filtered_total.inc();
let blocks = holders
.iter()
.map(|sst| sst.value().block_count() as u64)
.sum::<u64>();
GLOBAL_CACHE_REFILL_METRICS
.data_refill_filtered_total
.inc_by(blocks);
return;
}

let mut tasks = vec![];
for sst_info in &holders {
let task = async move {
GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();

let permit = context.concurrency.acquire().await.unwrap();

GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();

match context
for idx in 0..sst_info.value().block_count() {
let (parent_sst_obj_id, parent_sst_blk_idx) =
inheritances.get(sst_info.key()).unwrap()[idx];
if !context
.sstable_store
.fill_data_file_cache(sst_info.value())
.data_file_cache()
.exists(&SstableBlockIndex {
sst_id: parent_sst_obj_id,
block_idx: parent_sst_blk_idx as u64,
})
.await
.unwrap_or_default()
{
Ok(()) => GLOBAL_CACHE_REFILL_METRICS
.data_refill_success_duration
.observe(now.elapsed().as_secs_f64()),
Err(e) => {
tracing::warn!("data cache refill error: {:?}", e);
}
continue;
}
drop(permit);
};
tasks.push(task);

let task = async move {
GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();

let permit = context.concurrency.acquire().await.unwrap();

GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();

match context
.sstable_store
.fill_data_file_cache(sst_info.value(), idx)
.await
{
Ok(()) => GLOBAL_CACHE_REFILL_METRICS
.data_refill_success_duration
.observe(now.elapsed().as_secs_f64()),
Err(e) => {
tracing::warn!("data cache refill error: {:?}", e);
}
}
drop(permit);
};
tasks.push(task);
}
}

join_all(tasks).await;
Expand Down
42 changes: 13 additions & 29 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::time::Duration;
use await_tree::InstrumentAwait;
use bytes::Bytes;
use fail::fail_point;
use futures::future::try_join_all;
use futures::{future, StreamExt};
use itertools::Itertools;
use risingwave_common::cache::{CachePriority, LookupResponse, LruCacheEventListener};
Expand Down Expand Up @@ -548,41 +547,26 @@ impl SstableStore {
&self.data_file_cache
}

pub async fn fill_data_file_cache(&self, sst: &Sstable) -> HummockResult<()> {
pub async fn fill_data_file_cache(&self, sst: &Sstable, idx: usize) -> HummockResult<()> {
let object_id = sst.id;

if let Some(filter) = self.data_file_cache_refill_filter.as_ref() {
filter.insert(object_id);
}
let key = SstableBlockIndex {
sst_id: object_id,
block_idx: idx as u64,
};

let (range, uncompressed_capacity) = sst.calculate_block_info(idx);
let data = self
.store
.read(&self.get_sst_data_path(object_id), ..)
.read(&self.get_sst_data_path(object_id), range)
.await?;
let block = Block::decode(data, uncompressed_capacity)?;
let block = Box::new(block);

let mut tasks = vec![];
for block_index in 0..sst.block_count() {
let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
let bytes = data.slice(range);
let block = Block::decode(bytes, uncompressed_capacity)?;
let block = Box::new(block);

let key = SstableBlockIndex {
sst_id: object_id,
block_idx: block_index as u64,
};

let cache = self.data_file_cache.clone();
let task = async move {
cache
.insert_force(key, block)
.await
.map_err(HummockError::file_cache)
};
tasks.push(task);
}

try_join_all(tasks).await?;
self.data_file_cache
.insert_force(key, block)
.await
.map_err(HummockError::file_cache)?;

Ok(())
}
Expand Down

0 comments on commit d363d3a

Please sign in to comment.