From 826da0be747305e96df7ab6b4c429fd8b62ea582 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 17 Sep 2024 20:34:51 +0800 Subject: [PATCH 1/2] feat(storage): notify frontend with more hummock version info --- proto/meta.proto | 2 - .../common_service/src/observer_manager.rs | 1 - src/frontend/src/observer/observer_manager.rs | 32 +-- src/frontend/src/scheduler/snapshot.rs | 127 +++++---- src/meta/service/src/notification_service.rs | 10 +- src/meta/src/barrier/mod.rs | 20 +- src/meta/src/hummock/manager/commit_epoch.rs | 16 +- src/meta/src/hummock/manager/transaction.rs | 19 +- src/storage/hummock_sdk/src/change_log.rs | 4 +- .../compaction_group/hummock_version_ext.rs | 37 ++- .../hummock_sdk/src/frontend_version.rs | 257 ++++++++++++++++++ src/storage/hummock_sdk/src/lib.rs | 2 + 12 files changed, 402 insertions(+), 125 deletions(-) create mode 100644 src/storage/hummock_sdk/src/frontend_version.rs diff --git a/proto/meta.proto b/proto/meta.proto index 98a7f267c0124..8a979a17aa45c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -433,7 +433,6 @@ message MetaSnapshot { GetSessionParamsResponse session_params = 20; repeated catalog.Secret secrets = 23; repeated common.WorkerNode nodes = 10; - hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; @@ -485,7 +484,6 @@ message SubscribeResponse { user.UserInfo user = 11; SetSessionParamRequest session_param = 26; common.WorkerNode node = 13; - hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; MetaSnapshot snapshot = 16; backup_service.MetaBackupManifestId meta_backup_manifest_id = 17; diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index da61118f1ad49..8648644e3a3e2 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -117,7 +117,6 @@ where Info::HummockVersionDeltas(version_delta) => { version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id } - Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) | Info::SessionParam(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b49abcb023429..b1ecf4182d1d0 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -24,8 +24,9 @@ use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::ObserverState; +use risingwave_hummock_sdk::FrontendHummockVersion; use risingwave_pb::common::WorkerNode; -use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats}; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse}; @@ -85,11 +86,8 @@ impl ObserverState for FrontendObserverNode { resp ) } - Info::HummockSnapshot(_) => { - self.handle_hummock_snapshot_notification(resp); - } - Info::HummockVersionDeltas(_) => { - panic!("frontend node should not receive HummockVersionDeltas"); + Info::HummockVersionDeltas(deltas) => { + self.handle_hummock_snapshot_notification(deltas); } Info::MetaBackupManifestId(_) => { panic!("frontend node should not receive MetaBackupManifestId"); @@ -141,8 +139,7 @@ impl ObserverState for FrontendObserverNode { connections, users, nodes, - hummock_snapshot, - hummock_version: _, + hummock_version, meta_backup_manifest_id: _, hummock_write_limits: _, streaming_worker_slot_mappings, @@ -195,7 +192,9 @@ impl ObserverState for FrontendObserverNode { convert_worker_slot_mapping(&serving_worker_slot_mappings), ); self.hummock_snapshot_manager - .update(hummock_snapshot.unwrap()); + .init(FrontendHummockVersion::from_protobuf( + hummock_version.unwrap(), + )); let snapshot_version = version.unwrap(); catalog_guard.set_version(snapshot_version.catalog_version); @@ -465,19 +464,8 @@ impl FrontendObserverNode { } /// Update max committed epoch in `HummockSnapshotManager`. - fn handle_hummock_snapshot_notification(&self, resp: SubscribeResponse) { - let Some(info) = resp.info.as_ref() else { - return; - }; - match info { - Info::HummockSnapshot(hummock_snapshot) => match resp.operation() { - Operation::Update => { - self.hummock_snapshot_manager.update(*hummock_snapshot); - } - _ => panic!("receive an unsupported notify {:?}", resp), - }, - _ => unreachable!(), - } + fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) { + self.hummock_snapshot_manager.update(deltas); } fn handle_secret_notification(&mut self, resp: SubscribeResponse) { diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 4eb23e97dd5f7..c4df076826ab2 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -14,16 +14,22 @@ use std::assert_matches::assert_matches; use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::must_match; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; +use risingwave_hummock_sdk::{ + FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, +}; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::PbHummockSnapshot; +use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; +use tracing::warn; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; @@ -85,7 +91,7 @@ impl ReadSnapshot { // DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a // singleton for each snapshot. Use `PinnedSnapshotRef` instead. pub struct PinnedSnapshot { - value: PbHummockSnapshot, + value: FrontendHummockVersion, unpin_sender: UnboundedSender, } @@ -103,26 +109,29 @@ impl PinnedSnapshot { let epoch = if is_barrier_read { batch_query_epoch::Epoch::Current(u64::MAX) } else { - batch_query_epoch::Epoch::Committed(self.value.committed_epoch) + batch_query_epoch::Epoch::Committed(self.value.max_committed_epoch) }; BatchQueryEpoch { epoch: Some(epoch) } } pub fn committed_epoch(&self) -> u64 { - self.value.committed_epoch + self.value.max_committed_epoch } } impl Drop for PinnedSnapshot { fn drop(&mut self) { - let _ = self.unpin_sender.send(Operation::Unpin(self.value)); + let _ = self.unpin_sender.send(Operation::Unpin(self.value.id)); } } /// Returns an invalid snapshot, used for initial values. -fn invalid_snapshot() -> PbHummockSnapshot { - PbHummockSnapshot { - committed_epoch: INVALID_EPOCH, +fn invalid_snapshot() -> FrontendHummockVersion { + FrontendHummockVersion { + id: INVALID_VERSION_ID, + max_committed_epoch: 0, + state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()), + table_change_log: Default::default(), } } @@ -170,43 +179,54 @@ impl HummockSnapshotManager { self.latest_snapshot.borrow().clone() } + pub fn init(&self, version: FrontendHummockVersion) { + self.worker_sender + .send(Operation::Pin(version.id, version.max_committed_epoch)) + .unwrap(); + // Then set the latest snapshot. + let snapshot = Arc::new(PinnedSnapshot { + value: version, + unpin_sender: self.worker_sender.clone(), + }); + if self.latest_snapshot.send(snapshot).is_err() { + warn!("fail to set init version"); + } + } + /// Update the latest snapshot. /// /// Should only be called by the observer manager. - pub fn update(&self, snapshot: PbHummockSnapshot) { + pub fn update(&self, deltas: HummockVersionDeltas) { self.latest_snapshot.send_if_modified(move |old_snapshot| { - // Note(bugen): theoretically, the snapshots from the observer should always be - // monotonically increasing, so there's no need to `max` them or check whether they are - // the same. But we still do it here to be safe. - // TODO: turn this into an assertion. - let snapshot = PbHummockSnapshot { - committed_epoch: std::cmp::max( - old_snapshot.value.committed_epoch, - snapshot.committed_epoch, - ), + if deltas.version_deltas.is_empty() { + return false; + } + let snapshot = { + let mut snapshot = old_snapshot.value.clone(); + for delta in deltas.version_deltas { + snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); + } + snapshot }; - if old_snapshot.value == snapshot { - // Ignore the same snapshot - false - } else { - // First tell the worker that a new snapshot is going to be pinned. - self.worker_sender.send(Operation::Pin(snapshot)).unwrap(); - // Then set the latest snapshot. - *old_snapshot = Arc::new(PinnedSnapshot { - value: snapshot, - unpin_sender: self.worker_sender.clone(), - }); - - true - } + // First tell the worker that a new snapshot is going to be pinned. + self.worker_sender + .send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch)) + .unwrap(); + // Then set the latest snapshot. + *old_snapshot = Arc::new(PinnedSnapshot { + value: snapshot, + unpin_sender: self.worker_sender.clone(), + }); + + true }); } /// Wait until the latest snapshot is newer than the given one. pub async fn wait(&self, snapshot: PbHummockSnapshot) { let mut rx = self.latest_snapshot.subscribe(); - while rx.borrow_and_update().value.committed_epoch < snapshot.committed_epoch { + while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch { rx.changed().await.unwrap(); } } @@ -216,7 +236,7 @@ impl HummockSnapshotManager { #[derive(Debug)] enum PinState { /// The snapshot is currently pinned by some sessions in this frontend. - Pinned, + Pinned(u64), /// The snapshot is no longer pinned by any session in this frontend, but it's still considered /// to be pinned by the meta service. It will be unpinned by the [`UnpinWorker`] in the next @@ -228,20 +248,18 @@ enum PinState { #[derive(Debug)] enum Operation { /// Mark the snapshot as pinned, sent when a new snapshot is pinned with `update`. - Pin(PbHummockSnapshot), + Pin(HummockVersionId, u64), /// Mark the snapshot as unpinned, sent when all references to a [`PinnedSnapshot`] is dropped. - Unpin(PbHummockSnapshot), + Unpin(HummockVersionId), } impl Operation { /// Returns whether the operation is for an invalid snapshot, which should be ignored. fn is_invalid(&self) -> bool { - match self { - Operation::Pin(s) | Operation::Unpin(s) => s, - } - .committed_epoch - == INVALID_EPOCH + *match self { + Operation::Pin(id, _) | Operation::Unpin(id) => id, + } == INVALID_VERSION_ID } } @@ -249,13 +267,13 @@ impl Operation { /// /// The snapshot will be first sorted by `committed_epoch`, then by `current_epoch`. #[derive(Debug, PartialEq, Clone)] -struct SnapshotKey(PbHummockSnapshot); +struct SnapshotKey(HummockVersionId); impl Eq for SnapshotKey {} impl Ord for SnapshotKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.0.committed_epoch.cmp(&other.0.committed_epoch) + self.0.to_u64().cmp(&other.0.to_u64()) } } @@ -319,15 +337,15 @@ impl UnpinWorker { } match operation { - Operation::Pin(snapshot) => { + Operation::Pin(version_id, committed_epoch) => { self.states - .try_insert(SnapshotKey(snapshot), PinState::Pinned) + .try_insert(SnapshotKey(version_id), PinState::Pinned(committed_epoch)) .unwrap(); } Operation::Unpin(snapshot) => match self.states.entry(SnapshotKey(snapshot)) { Entry::Vacant(_v) => unreachable!("unpin a snapshot that is not pinned"), Entry::Occupied(o) => { - assert_matches!(o.get(), PinState::Pinned); + assert_matches!(o.get(), PinState::Pinned(_)); *o.into_mut() = PinState::Unpinned; } }, @@ -338,18 +356,23 @@ impl UnpinWorker { /// and clean up their entries. async fn unpin_batch(&mut self) { // Find the minimum snapshot that is pinned. Unpin all snapshots before it. - if let Some(min_snapshot) = self + if let Some((min_snapshot, min_committed_epoch)) = self .states .iter() - .find(|(_, s)| matches!(s, PinState::Pinned)) - .map(|(k, _)| k.clone()) + .find(|(_, s)| matches!(s, PinState::Pinned(_))) + .map(|(k, s)| { + ( + k.clone(), + must_match!(s, PinState::Pinned(committed_epoch) => *committed_epoch), + ) + }) { if &min_snapshot == self.states.first_key_value().unwrap().0 { // Nothing to unpin. return; } - let min_epoch = min_snapshot.0.committed_epoch; + let min_epoch = min_committed_epoch; match self.meta_client.unpin_snapshot_before(min_epoch).await { Ok(()) => { diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index aada6c6876afe..6e993f2067f0a 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -15,6 +15,7 @@ use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::secret::{LocalSecretManager, SecretEncryption}; +use risingwave_hummock_sdk::FrontendHummockVersion; use risingwave_meta::manager::{MetadataManager, SessionParamsManagerImpl}; use risingwave_meta::MetaResult; use risingwave_pb::backup_service::MetaBackupManifestId; @@ -305,7 +306,12 @@ impl NotificationServiceImpl { let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?; - let hummock_snapshot = Some(self.hummock_manager.latest_snapshot()); + let hummock_version = self + .hummock_manager + .on_current_version(|version| { + FrontendHummockVersion::from_version(version).to_protobuf() + }) + .await; let session_params = match self.env.session_params_manager_impl_ref() { SessionParamsManagerImpl::Kv(manager) => manager.get_params().await, @@ -331,7 +337,7 @@ impl NotificationServiceImpl { secrets: decrypted_secrets, users, nodes, - hummock_snapshot, + hummock_version: Some(hummock_version), version: Some(SnapshotVersion { catalog_version, worker_node_version, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0772bac6699e1..125a45d583ce3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -41,7 +41,6 @@ use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; use risingwave_pb::catalog::table::TableType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, PbRecoveryStatus}; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; @@ -1281,13 +1280,6 @@ impl GlobalBarrierManagerContext { ) -> MetaResult> { { { - // We must ensure all epochs are committed in ascending order, - // because the storage engine will query from new to old in the order in which - // the L0 layer files are generated. - // See https://github.com/risingwave-labs/risingwave/issues/1251 - // hummock_manager commit epoch. - let mut new_snapshot = None; - match &command_ctx.kind { BarrierKind::Initial => {} BarrierKind::Checkpoint(epochs) => { @@ -1298,7 +1290,7 @@ impl GlobalBarrierManagerContext { backfill_pinned_log_epoch, tables_to_commit, ); - new_snapshot = self.hummock_manager.commit_epoch(commit_info).await?; + self.hummock_manager.commit_epoch(commit_info).await?; } BarrierKind::Barrier => { // if we collect a barrier(checkpoint = false), @@ -1309,16 +1301,6 @@ impl GlobalBarrierManagerContext { } command_ctx.post_collect().await?; - // Notify new snapshot after fragment_mapping changes have been notified in - // `post_collect`. - if let Some(snapshot) = new_snapshot { - self.env - .notification_manager() - .notify_frontend_without_version( - Operation::Update, // Frontends don't care about operation. - Info::HummockSnapshot(snapshot), - ); - } Ok(if command_ctx.kind.is_checkpoint() { Some(self.hummock_manager.get_version_stats().await) } else { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 8c021509dcbb2..f586999e55f94 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -71,10 +71,7 @@ pub struct CommitEpochInfo { impl HummockManager { /// Caller should ensure `epoch` > `max_committed_epoch` - pub async fn commit_epoch( - &self, - commit_info: CommitEpochInfo, - ) -> Result> { + pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> { let CommitEpochInfo { mut sstables, new_table_watermarks, @@ -89,7 +86,7 @@ impl HummockManager { let _timer = start_measure_real_process_timer!(self, "commit_epoch"); // Prevent commit new epochs if this flag is set if versioning_guard.disable_commit_epochs { - return Ok(None); + return Ok(()); } let versioning: &mut Versioning = &mut versioning_guard; @@ -291,14 +288,11 @@ impl HummockManager { )?; } - let snapshot = if is_visible_table_committed_epoch { + if is_visible_table_committed_epoch { let snapshot = HummockSnapshot { committed_epoch }; let prev_snapshot = self.latest_snapshot.swap(snapshot.into()); assert!(prev_snapshot.committed_epoch < committed_epoch); - Some(snapshot) - } else { - None - }; + } for compaction_group_id in &modified_compaction_groups { trigger_sst_stat( @@ -329,7 +323,7 @@ impl HummockManager { { self.check_state_consistency().await; } - Ok(snapshot) + Ok(()) } fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 9a795608f7e1a..14ea961d82bef 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -23,10 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{ + CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, +}; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats, - StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, + HummockVersionStats, StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -230,6 +232,17 @@ impl<'a> InMemValTransaction for HummockVersionTransaction<'a> { version_deltas: pb_deltas, }), ); + self.notification_manager.notify_frontend_without_version( + Operation::Update, + Info::HummockVersionDeltas(HummockVersionDeltas { + version_deltas: deltas + .iter() + .map(|delta| { + FrontendHummockVersionDelta::from_delta(delta).to_protobuf() + }) + .collect(), + }), + ); } for delta in deltas { assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none()); diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index c231b0eb6b7b5..4402c63b2831c 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -87,8 +87,8 @@ where } } -impl TableChangeLog { - pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLog] { +impl TableChangeLogCommon { + pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLogCommon] { let start = self.0.partition_point(|epoch_change_log| { epoch_change_log.epochs.last().expect("non-empty") < &min_epoch }); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 376626e844242..881527882672b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -23,13 +23,13 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange, - GroupTableChange, PbLevelType, + GroupTableChange, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; use super::group_split::get_sub_level_insert_hint; use super::{group_split, StateTableId}; -use crate::change_log::TableChangeLogCommon; +use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::level::{Level, Levels, OverlappingLevel}; @@ -781,9 +781,25 @@ impl HummockVersion { } // apply to table change log - for (table_id, change_log_delta) in &version_delta.change_log_delta { + Self::apply_change_log_delta( + &mut self.table_change_log, + &version_delta.change_log_delta, + &version_delta.removed_table_ids, + &version_delta.state_table_info_delta, + &changed_table_info, + ); + } + + pub fn apply_change_log_delta( + table_change_log: &mut HashMap>, + change_log_delta: &HashMap>, + removed_table_ids: &HashSet, + state_table_info_delta: &HashMap, + changed_table_info: &HashMap>, + ) { + for (table_id, change_log_delta) in change_log_delta { let new_change_log = change_log_delta.new_log.as_ref().unwrap(); - match self.table_change_log.entry(*table_id) { + match table_change_log.entry(*table_id) { Entry::Occupied(entry) => { let change_log = entry.into_mut(); if let Some(prev_log) = change_log.0.last() { @@ -803,22 +819,21 @@ impl HummockVersion { // If a table has no new change log entry (even an empty one), it means we have stopped maintained // the change log for the table, and then we will remove the table change log. // The table change log will also be removed when the table id is removed. - self.table_change_log.retain(|table_id, _| { - if version_delta.removed_table_ids.contains(table_id) { + table_change_log.retain(|table_id, _| { + if removed_table_ids.contains(table_id) { return false; } - if let Some(table_info_delta) = version_delta.state_table_info_delta.get(table_id) + if let Some(table_info_delta) = state_table_info_delta.get(table_id) && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch { // the table exists previously, and its committed epoch has progressed. } else { // otherwise, the table change log should be kept anyway return true; } - let contains = version_delta.change_log_delta.contains_key(table_id); + let contains = change_log_delta.contains_key(table_id); if !contains { warn!( ?table_id, - max_committed_epoch = version_delta.visible_table_committed_epoch(), "table change log dropped due to no further change log at newly committed epoch", ); } @@ -826,8 +841,8 @@ impl HummockVersion { }); // truncate the remaining table change log - for (table_id, change_log_delta) in &version_delta.change_log_delta { - if let Some(change_log) = self.table_change_log.get_mut(table_id) { + for (table_id, change_log_delta) in change_log_delta { + if let Some(change_log) = table_change_log.get_mut(table_id) { change_log.truncate(change_log_delta.truncate_epoch); } } diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs new file mode 100644 index 0000000000000..549688054a8ad --- /dev/null +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -0,0 +1,257 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; +use risingwave_pb::hummock::{ + PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog, + StateTableInfoDelta, +}; + +use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon}; +use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; +use crate::{HummockVersionId, INVALID_VERSION_ID}; + +#[derive(Clone, Debug)] +pub struct FrontendHummockVersion { + pub id: HummockVersionId, + pub max_committed_epoch: u64, + pub state_table_info: HummockVersionStateTableInfo, + pub table_change_log: HashMap>, +} + +impl FrontendHummockVersion { + pub fn from_version(version: &HummockVersion) -> Self { + Self { + id: version.id, + max_committed_epoch: version.max_committed_epoch, + state_table_info: version.state_table_info.clone(), + table_change_log: version + .table_change_log + .iter() + .map(|(table_id, change_log)| { + ( + *table_id, + TableChangeLogCommon( + change_log + .0 + .iter() + .map(|change_log| EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: change_log.epochs.clone(), + }) + .collect(), + ), + ) + }) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersion { + PbHummockVersion { + id: self.id.0, + levels: Default::default(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: 0, + table_watermarks: Default::default(), + table_change_logs: self + .table_change_log + .iter() + .map(|(table_id, change_log)| { + ( + table_id.table_id, + PbTableChangeLog { + change_logs: change_log + .0 + .iter() + .map(|change_log| PbEpochNewChangeLog { + old_value: vec![], + new_value: vec![], + epochs: change_log.epochs.clone(), + }) + .collect(), + }, + ) + }) + .collect(), + state_table_info: self.state_table_info.to_protobuf(), + } + } + + pub fn from_protobuf(value: PbHummockVersion) -> Self { + Self { + id: HummockVersionId(value.id), + max_committed_epoch: value.max_committed_epoch, + state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info), + table_change_log: value + .table_change_logs + .into_iter() + .map(|(table_id, change_log)| { + ( + TableId::new(table_id), + TableChangeLogCommon( + change_log + .change_logs + .into_iter() + .map(|change_log| EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: change_log.epochs, + }) + .collect(), + ), + ) + }) + .collect(), + } + } + + pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) { + if self.id != INVALID_VERSION_ID { + assert_eq!(self.id, delta.prev_id); + } + self.id = delta.id; + self.max_committed_epoch = delta.max_committed_epoch; + let (changed_table_info, _) = self + .state_table_info + .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id); + HummockVersion::apply_change_log_delta( + &mut self.table_change_log, + &delta.change_log_delta, + &delta.removed_table_id, + &delta.state_table_info_delta, + &changed_table_info, + ); + } +} + +pub struct FrontendHummockVersionDelta { + pub prev_id: HummockVersionId, + pub id: HummockVersionId, + pub max_committed_epoch: u64, + pub removed_table_id: HashSet, + pub state_table_info_delta: HashMap, + pub change_log_delta: HashMap>, +} + +impl FrontendHummockVersionDelta { + pub fn from_delta(delta: &HummockVersionDelta) -> Self { + Self { + prev_id: delta.prev_id, + id: delta.id, + max_committed_epoch: delta.max_committed_epoch, + removed_table_id: delta.removed_table_ids.clone(), + state_table_info_delta: delta.state_table_info_delta.clone(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, change_log_delta)| { + ( + *table_id, + ChangeLogDeltaCommon { + truncate_epoch: change_log_delta.truncate_epoch, + new_log: change_log_delta.new_log.as_ref().map(|new_log| { + EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: new_log.epochs.clone(), + } + }), + }, + ) + }) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersionDelta { + PbHummockVersionDelta { + id: self.id.to_u64(), + prev_id: self.prev_id.to_u64(), + group_deltas: Default::default(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: 0, + trivial_move: false, + new_table_watermarks: Default::default(), + removed_table_ids: self + .removed_table_id + .iter() + .map(|table_id| table_id.table_id) + .collect(), + change_log_delta: self + .change_log_delta + .iter() + .map(|(table_id, delta)| { + ( + table_id.table_id, + PbChangeLogDelta { + new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { + old_value: vec![], + new_value: vec![], + epochs: new_log.epochs.clone(), + }), + truncate_epoch: delta.truncate_epoch, + }, + ) + }) + .collect(), + state_table_info_delta: self + .state_table_info_delta + .iter() + .map(|(table_id, delta)| (table_id.table_id, *delta)) + .collect(), + } + } + + pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self { + Self { + prev_id: HummockVersionId::new(delta.prev_id), + id: HummockVersionId::new(delta.id), + max_committed_epoch: delta.max_committed_epoch, + removed_table_id: delta + .removed_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + state_table_info_delta: delta + .state_table_info_delta + .into_iter() + .map(|(table_id, delta)| (TableId::new(table_id), delta)) + .collect(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, change_log_delta)| { + ( + TableId::new(*table_id), + ChangeLogDeltaCommon { + truncate_epoch: change_log_delta.truncate_epoch, + new_log: change_log_delta.new_log.as_ref().map(|new_log| { + EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: new_log.epochs.clone(), + } + }), + }, + ) + }) + .collect(), + } + } +} diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 921ab18fcf7cd..b84d2751a50ea 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -51,6 +51,8 @@ pub mod table_stats; pub mod table_watermark; pub mod time_travel; pub mod version; +pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta}; +mod frontend_version; pub use compact::*; use risingwave_common::catalog::TableId; From e4b390cdc74fc18b743f5f11a036abdfaea9e762 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 18 Sep 2024 13:43:10 +0800 Subject: [PATCH 2/2] fix test --- src/frontend/src/scheduler/snapshot.rs | 47 +++++++++++++---------- src/storage/hummock_sdk/src/change_log.rs | 5 ++- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index c4df076826ab2..5d1ad6d69d0b9 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -29,7 +29,6 @@ use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; -use tracing::warn; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; @@ -180,35 +179,41 @@ impl HummockSnapshotManager { } pub fn init(&self, version: FrontendHummockVersion) { - self.worker_sender - .send(Operation::Pin(version.id, version.max_committed_epoch)) - .unwrap(); - // Then set the latest snapshot. - let snapshot = Arc::new(PinnedSnapshot { - value: version, - unpin_sender: self.worker_sender.clone(), - }); - if self.latest_snapshot.send(snapshot).is_err() { - warn!("fail to set init version"); - } + self.update_inner(|_| Some(version)); } /// Update the latest snapshot. /// /// Should only be called by the observer manager. pub fn update(&self, deltas: HummockVersionDeltas) { - self.latest_snapshot.send_if_modified(move |old_snapshot| { + self.update_inner(|old_snapshot| { if deltas.version_deltas.is_empty() { - return false; + return None; } - let snapshot = { - let mut snapshot = old_snapshot.value.clone(); - for delta in deltas.version_deltas { - snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); - } - snapshot - }; + let mut snapshot = old_snapshot.clone(); + for delta in deltas.version_deltas { + snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); + } + Some(snapshot) + }) + } + fn update_inner( + &self, + get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option, + ) { + self.latest_snapshot.send_if_modified(move |old_snapshot| { + let new_snapshot = get_new_snapshot(&old_snapshot.value); + let Some(snapshot) = new_snapshot else { + return false; + }; + if snapshot.id <= old_snapshot.value.id { + assert_eq!( + snapshot.id, old_snapshot.value.id, + "receive stale frontend version" + ); + return false; + } // First tell the worker that a new snapshot is going to be pinned. self.worker_sender .send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch)) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 4402c63b2831c..cf3ded58b946e 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -88,7 +88,10 @@ where } impl TableChangeLogCommon { - pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLogCommon] { + pub fn filter_epoch( + &self, + (min_epoch, max_epoch): (u64, u64), + ) -> &[EpochNewChangeLogCommon] { let start = self.0.partition_point(|epoch_change_log| { epoch_change_log.epochs.last().expect("non-empty") < &min_epoch });