Skip to content

Commit

Permalink
fix: approximately update table stats for vnode watermark reclaim (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 20, 2024
1 parent 407e073 commit b0c5ffd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
43 changes: 42 additions & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
// Apply stats changes.
let mut version_stats = HummockVersionStatsTransaction::new(
&mut versioning.version_stats,
self.env.notification_manager(),
);

if deterministic_mode {
version.disable_apply_to_txn();
Expand Down Expand Up @@ -806,6 +811,10 @@ impl HummockManager {
.sorted_output_ssts
.clone_from(&compact_task.input_ssts[0].table_infos);
}
update_table_stats_for_vnode_watermark_trivial_reclaim(
&mut version_stats.table_stats,
&compact_task,
);
self.metrics
.compact_frequency
.with_label_values(&[
Expand Down Expand Up @@ -876,7 +885,8 @@ impl HummockManager {
self.meta_store_ref(),
compaction_statuses,
compact_task_assignment,
version
version,
version_stats
)?;
self.metrics
.compact_task_batch_count
Expand Down Expand Up @@ -1699,3 +1709,34 @@ impl Compaction {
.collect()
}
}

/// Updates table stats caused by vnode watermark trivial reclaim compaction.
fn update_table_stats_for_vnode_watermark_trivial_reclaim(
table_stats: &mut PbTableStatsMap,
task: &CompactTask,
) {
if task.task_type != TaskType::VnodeWatermark {
return;
}
let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
assert_eq!(s.table_ids.len(), 1);
let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
*e += s.total_key_count;
}
for (table_id, delete_count) in deleted_table_keys {
let Some(stats) = table_stats.get_mut(&table_id) else {
continue;
};
if stats.total_key_count == 0 {
continue;
}
let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
// total_key_count is updated accurately.
stats.total_key_count = new_total_key_count;
// others are updated approximately.
stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
}
}
6 changes: 0 additions & 6 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::{HummockVersionId, LocalSstableInfo, SyncResult};
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request};
use risingwave_pb::stream_service::streaming_control_stream_response::{
InitResponse, ShutdownResponse,
Expand Down Expand Up @@ -531,11 +530,6 @@ impl LocalBarrierWorker {
barrier: &Barrier,
request: InjectBarrierRequest,
) -> StreamResult<()> {
if barrier.kind == BarrierKind::Initial {
self.actor_manager
.watermark_epoch
.store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst);
}
debug!(
target: "events::stream::barrier::manager::send",
"send barrier {:?}, actor_ids_to_collect = {:?}",
Expand Down

0 comments on commit b0c5ffd

Please sign in to comment.