From 826da0be747305e96df7ab6b4c429fd8b62ea582 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 17 Sep 2024 20:34:51 +0800 Subject: [PATCH 1/3] feat(storage): notify frontend with more hummock version info --- proto/meta.proto | 2 - .../common_service/src/observer_manager.rs | 1 - src/frontend/src/observer/observer_manager.rs | 32 +-- src/frontend/src/scheduler/snapshot.rs | 127 +++++---- src/meta/service/src/notification_service.rs | 10 +- src/meta/src/barrier/mod.rs | 20 +- src/meta/src/hummock/manager/commit_epoch.rs | 16 +- src/meta/src/hummock/manager/transaction.rs | 19 +- src/storage/hummock_sdk/src/change_log.rs | 4 +- .../compaction_group/hummock_version_ext.rs | 37 ++- .../hummock_sdk/src/frontend_version.rs | 257 ++++++++++++++++++ src/storage/hummock_sdk/src/lib.rs | 2 + 12 files changed, 402 insertions(+), 125 deletions(-) create mode 100644 src/storage/hummock_sdk/src/frontend_version.rs diff --git a/proto/meta.proto b/proto/meta.proto index 98a7f267c0124..8a979a17aa45c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -433,7 +433,6 @@ message MetaSnapshot { GetSessionParamsResponse session_params = 20; repeated catalog.Secret secrets = 23; repeated common.WorkerNode nodes = 10; - hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; @@ -485,7 +484,6 @@ message SubscribeResponse { user.UserInfo user = 11; SetSessionParamRequest session_param = 26; common.WorkerNode node = 13; - hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; MetaSnapshot snapshot = 16; backup_service.MetaBackupManifestId meta_backup_manifest_id = 17; diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index da61118f1ad49..8648644e3a3e2 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -117,7 +117,6 @@ where Info::HummockVersionDeltas(version_delta) => { version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id } - Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) | Info::SessionParam(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b49abcb023429..b1ecf4182d1d0 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -24,8 +24,9 @@ use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::ObserverState; +use risingwave_hummock_sdk::FrontendHummockVersion; use risingwave_pb::common::WorkerNode; -use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats}; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse}; @@ -85,11 +86,8 @@ impl ObserverState for FrontendObserverNode { resp ) } - Info::HummockSnapshot(_) => { - self.handle_hummock_snapshot_notification(resp); - } - Info::HummockVersionDeltas(_) => { - panic!("frontend node should not receive HummockVersionDeltas"); + Info::HummockVersionDeltas(deltas) => { + self.handle_hummock_snapshot_notification(deltas); } Info::MetaBackupManifestId(_) => { panic!("frontend node should not receive MetaBackupManifestId"); @@ -141,8 +139,7 @@ impl ObserverState for FrontendObserverNode { connections, users, nodes, - hummock_snapshot, - hummock_version: _, + hummock_version, meta_backup_manifest_id: _, hummock_write_limits: _, streaming_worker_slot_mappings, @@ -195,7 +192,9 @@ impl ObserverState for FrontendObserverNode { convert_worker_slot_mapping(&serving_worker_slot_mappings), ); self.hummock_snapshot_manager - .update(hummock_snapshot.unwrap()); + .init(FrontendHummockVersion::from_protobuf( + hummock_version.unwrap(), + )); let snapshot_version = version.unwrap(); catalog_guard.set_version(snapshot_version.catalog_version); @@ -465,19 +464,8 @@ impl FrontendObserverNode { } /// Update max committed epoch in `HummockSnapshotManager`. - fn handle_hummock_snapshot_notification(&self, resp: SubscribeResponse) { - let Some(info) = resp.info.as_ref() else { - return; - }; - match info { - Info::HummockSnapshot(hummock_snapshot) => match resp.operation() { - Operation::Update => { - self.hummock_snapshot_manager.update(*hummock_snapshot); - } - _ => panic!("receive an unsupported notify {:?}", resp), - }, - _ => unreachable!(), - } + fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) { + self.hummock_snapshot_manager.update(deltas); } fn handle_secret_notification(&mut self, resp: SubscribeResponse) { diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 4eb23e97dd5f7..c4df076826ab2 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -14,16 +14,22 @@ use std::assert_matches::assert_matches; use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::must_match; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; +use risingwave_hummock_sdk::{ + FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, +}; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::PbHummockSnapshot; +use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; +use tracing::warn; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; @@ -85,7 +91,7 @@ impl ReadSnapshot { // DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a // singleton for each snapshot. Use `PinnedSnapshotRef` instead. pub struct PinnedSnapshot { - value: PbHummockSnapshot, + value: FrontendHummockVersion, unpin_sender: UnboundedSender, } @@ -103,26 +109,29 @@ impl PinnedSnapshot { let epoch = if is_barrier_read { batch_query_epoch::Epoch::Current(u64::MAX) } else { - batch_query_epoch::Epoch::Committed(self.value.committed_epoch) + batch_query_epoch::Epoch::Committed(self.value.max_committed_epoch) }; BatchQueryEpoch { epoch: Some(epoch) } } pub fn committed_epoch(&self) -> u64 { - self.value.committed_epoch + self.value.max_committed_epoch } } impl Drop for PinnedSnapshot { fn drop(&mut self) { - let _ = self.unpin_sender.send(Operation::Unpin(self.value)); + let _ = self.unpin_sender.send(Operation::Unpin(self.value.id)); } } /// Returns an invalid snapshot, used for initial values. -fn invalid_snapshot() -> PbHummockSnapshot { - PbHummockSnapshot { - committed_epoch: INVALID_EPOCH, +fn invalid_snapshot() -> FrontendHummockVersion { + FrontendHummockVersion { + id: INVALID_VERSION_ID, + max_committed_epoch: 0, + state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()), + table_change_log: Default::default(), } } @@ -170,43 +179,54 @@ impl HummockSnapshotManager { self.latest_snapshot.borrow().clone() } + pub fn init(&self, version: FrontendHummockVersion) { + self.worker_sender + .send(Operation::Pin(version.id, version.max_committed_epoch)) + .unwrap(); + // Then set the latest snapshot. + let snapshot = Arc::new(PinnedSnapshot { + value: version, + unpin_sender: self.worker_sender.clone(), + }); + if self.latest_snapshot.send(snapshot).is_err() { + warn!("fail to set init version"); + } + } + /// Update the latest snapshot. /// /// Should only be called by the observer manager. - pub fn update(&self, snapshot: PbHummockSnapshot) { + pub fn update(&self, deltas: HummockVersionDeltas) { self.latest_snapshot.send_if_modified(move |old_snapshot| { - // Note(bugen): theoretically, the snapshots from the observer should always be - // monotonically increasing, so there's no need to `max` them or check whether they are - // the same. But we still do it here to be safe. - // TODO: turn this into an assertion. - let snapshot = PbHummockSnapshot { - committed_epoch: std::cmp::max( - old_snapshot.value.committed_epoch, - snapshot.committed_epoch, - ), + if deltas.version_deltas.is_empty() { + return false; + } + let snapshot = { + let mut snapshot = old_snapshot.value.clone(); + for delta in deltas.version_deltas { + snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); + } + snapshot }; - if old_snapshot.value == snapshot { - // Ignore the same snapshot - false - } else { - // First tell the worker that a new snapshot is going to be pinned. - self.worker_sender.send(Operation::Pin(snapshot)).unwrap(); - // Then set the latest snapshot. - *old_snapshot = Arc::new(PinnedSnapshot { - value: snapshot, - unpin_sender: self.worker_sender.clone(), - }); - - true - } + // First tell the worker that a new snapshot is going to be pinned. + self.worker_sender + .send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch)) + .unwrap(); + // Then set the latest snapshot. + *old_snapshot = Arc::new(PinnedSnapshot { + value: snapshot, + unpin_sender: self.worker_sender.clone(), + }); + + true }); } /// Wait until the latest snapshot is newer than the given one. pub async fn wait(&self, snapshot: PbHummockSnapshot) { let mut rx = self.latest_snapshot.subscribe(); - while rx.borrow_and_update().value.committed_epoch < snapshot.committed_epoch { + while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch { rx.changed().await.unwrap(); } } @@ -216,7 +236,7 @@ impl HummockSnapshotManager { #[derive(Debug)] enum PinState { /// The snapshot is currently pinned by some sessions in this frontend. - Pinned, + Pinned(u64), /// The snapshot is no longer pinned by any session in this frontend, but it's still considered /// to be pinned by the meta service. It will be unpinned by the [`UnpinWorker`] in the next @@ -228,20 +248,18 @@ enum PinState { #[derive(Debug)] enum Operation { /// Mark the snapshot as pinned, sent when a new snapshot is pinned with `update`. - Pin(PbHummockSnapshot), + Pin(HummockVersionId, u64), /// Mark the snapshot as unpinned, sent when all references to a [`PinnedSnapshot`] is dropped. - Unpin(PbHummockSnapshot), + Unpin(HummockVersionId), } impl Operation { /// Returns whether the operation is for an invalid snapshot, which should be ignored. fn is_invalid(&self) -> bool { - match self { - Operation::Pin(s) | Operation::Unpin(s) => s, - } - .committed_epoch - == INVALID_EPOCH + *match self { + Operation::Pin(id, _) | Operation::Unpin(id) => id, + } == INVALID_VERSION_ID } } @@ -249,13 +267,13 @@ impl Operation { /// /// The snapshot will be first sorted by `committed_epoch`, then by `current_epoch`. #[derive(Debug, PartialEq, Clone)] -struct SnapshotKey(PbHummockSnapshot); +struct SnapshotKey(HummockVersionId); impl Eq for SnapshotKey {} impl Ord for SnapshotKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.0.committed_epoch.cmp(&other.0.committed_epoch) + self.0.to_u64().cmp(&other.0.to_u64()) } } @@ -319,15 +337,15 @@ impl UnpinWorker { } match operation { - Operation::Pin(snapshot) => { + Operation::Pin(version_id, committed_epoch) => { self.states - .try_insert(SnapshotKey(snapshot), PinState::Pinned) + .try_insert(SnapshotKey(version_id), PinState::Pinned(committed_epoch)) .unwrap(); } Operation::Unpin(snapshot) => match self.states.entry(SnapshotKey(snapshot)) { Entry::Vacant(_v) => unreachable!("unpin a snapshot that is not pinned"), Entry::Occupied(o) => { - assert_matches!(o.get(), PinState::Pinned); + assert_matches!(o.get(), PinState::Pinned(_)); *o.into_mut() = PinState::Unpinned; } }, @@ -338,18 +356,23 @@ impl UnpinWorker { /// and clean up their entries. async fn unpin_batch(&mut self) { // Find the minimum snapshot that is pinned. Unpin all snapshots before it. - if let Some(min_snapshot) = self + if let Some((min_snapshot, min_committed_epoch)) = self .states .iter() - .find(|(_, s)| matches!(s, PinState::Pinned)) - .map(|(k, _)| k.clone()) + .find(|(_, s)| matches!(s, PinState::Pinned(_))) + .map(|(k, s)| { + ( + k.clone(), + must_match!(s, PinState::Pinned(committed_epoch) => *committed_epoch), + ) + }) { if &min_snapshot == self.states.first_key_value().unwrap().0 { // Nothing to unpin. return; } - let min_epoch = min_snapshot.0.committed_epoch; + let min_epoch = min_committed_epoch; match self.meta_client.unpin_snapshot_before(min_epoch).await { Ok(()) => { diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index aada6c6876afe..6e993f2067f0a 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -15,6 +15,7 @@ use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::secret::{LocalSecretManager, SecretEncryption}; +use risingwave_hummock_sdk::FrontendHummockVersion; use risingwave_meta::manager::{MetadataManager, SessionParamsManagerImpl}; use risingwave_meta::MetaResult; use risingwave_pb::backup_service::MetaBackupManifestId; @@ -305,7 +306,12 @@ impl NotificationServiceImpl { let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?; - let hummock_snapshot = Some(self.hummock_manager.latest_snapshot()); + let hummock_version = self + .hummock_manager + .on_current_version(|version| { + FrontendHummockVersion::from_version(version).to_protobuf() + }) + .await; let session_params = match self.env.session_params_manager_impl_ref() { SessionParamsManagerImpl::Kv(manager) => manager.get_params().await, @@ -331,7 +337,7 @@ impl NotificationServiceImpl { secrets: decrypted_secrets, users, nodes, - hummock_snapshot, + hummock_version: Some(hummock_version), version: Some(SnapshotVersion { catalog_version, worker_node_version, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0772bac6699e1..125a45d583ce3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -41,7 +41,6 @@ use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; use risingwave_pb::catalog::table::TableType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, PbRecoveryStatus}; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; @@ -1281,13 +1280,6 @@ impl GlobalBarrierManagerContext { ) -> MetaResult> { { { - // We must ensure all epochs are committed in ascending order, - // because the storage engine will query from new to old in the order in which - // the L0 layer files are generated. - // See https://github.com/risingwave-labs/risingwave/issues/1251 - // hummock_manager commit epoch. - let mut new_snapshot = None; - match &command_ctx.kind { BarrierKind::Initial => {} BarrierKind::Checkpoint(epochs) => { @@ -1298,7 +1290,7 @@ impl GlobalBarrierManagerContext { backfill_pinned_log_epoch, tables_to_commit, ); - new_snapshot = self.hummock_manager.commit_epoch(commit_info).await?; + self.hummock_manager.commit_epoch(commit_info).await?; } BarrierKind::Barrier => { // if we collect a barrier(checkpoint = false), @@ -1309,16 +1301,6 @@ impl GlobalBarrierManagerContext { } command_ctx.post_collect().await?; - // Notify new snapshot after fragment_mapping changes have been notified in - // `post_collect`. - if let Some(snapshot) = new_snapshot { - self.env - .notification_manager() - .notify_frontend_without_version( - Operation::Update, // Frontends don't care about operation. - Info::HummockSnapshot(snapshot), - ); - } Ok(if command_ctx.kind.is_checkpoint() { Some(self.hummock_manager.get_version_stats().await) } else { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 8c021509dcbb2..f586999e55f94 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -71,10 +71,7 @@ pub struct CommitEpochInfo { impl HummockManager { /// Caller should ensure `epoch` > `max_committed_epoch` - pub async fn commit_epoch( - &self, - commit_info: CommitEpochInfo, - ) -> Result> { + pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> { let CommitEpochInfo { mut sstables, new_table_watermarks, @@ -89,7 +86,7 @@ impl HummockManager { let _timer = start_measure_real_process_timer!(self, "commit_epoch"); // Prevent commit new epochs if this flag is set if versioning_guard.disable_commit_epochs { - return Ok(None); + return Ok(()); } let versioning: &mut Versioning = &mut versioning_guard; @@ -291,14 +288,11 @@ impl HummockManager { )?; } - let snapshot = if is_visible_table_committed_epoch { + if is_visible_table_committed_epoch { let snapshot = HummockSnapshot { committed_epoch }; let prev_snapshot = self.latest_snapshot.swap(snapshot.into()); assert!(prev_snapshot.committed_epoch < committed_epoch); - Some(snapshot) - } else { - None - }; + } for compaction_group_id in &modified_compaction_groups { trigger_sst_stat( @@ -329,7 +323,7 @@ impl HummockManager { { self.check_state_consistency().await; } - Ok(snapshot) + Ok(()) } fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 9a795608f7e1a..14ea961d82bef 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -23,10 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{ + CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, +}; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats, - StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, + HummockVersionStats, StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -230,6 +232,17 @@ impl<'a> InMemValTransaction for HummockVersionTransaction<'a> { version_deltas: pb_deltas, }), ); + self.notification_manager.notify_frontend_without_version( + Operation::Update, + Info::HummockVersionDeltas(HummockVersionDeltas { + version_deltas: deltas + .iter() + .map(|delta| { + FrontendHummockVersionDelta::from_delta(delta).to_protobuf() + }) + .collect(), + }), + ); } for delta in deltas { assert!(self.orig_deltas.insert(delta.id, delta.clone()).is_none()); diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index c231b0eb6b7b5..4402c63b2831c 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -87,8 +87,8 @@ where } } -impl TableChangeLog { - pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLog] { +impl TableChangeLogCommon { + pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLogCommon] { let start = self.0.partition_point(|epoch_change_log| { epoch_change_log.epochs.last().expect("non-empty") < &min_epoch }); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 376626e844242..881527882672b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -23,13 +23,13 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, GroupMetaChange, - GroupTableChange, PbLevelType, + GroupTableChange, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; use super::group_split::get_sub_level_insert_hint; use super::{group_split, StateTableId}; -use crate::change_log::TableChangeLogCommon; +use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::level::{Level, Levels, OverlappingLevel}; @@ -781,9 +781,25 @@ impl HummockVersion { } // apply to table change log - for (table_id, change_log_delta) in &version_delta.change_log_delta { + Self::apply_change_log_delta( + &mut self.table_change_log, + &version_delta.change_log_delta, + &version_delta.removed_table_ids, + &version_delta.state_table_info_delta, + &changed_table_info, + ); + } + + pub fn apply_change_log_delta( + table_change_log: &mut HashMap>, + change_log_delta: &HashMap>, + removed_table_ids: &HashSet, + state_table_info_delta: &HashMap, + changed_table_info: &HashMap>, + ) { + for (table_id, change_log_delta) in change_log_delta { let new_change_log = change_log_delta.new_log.as_ref().unwrap(); - match self.table_change_log.entry(*table_id) { + match table_change_log.entry(*table_id) { Entry::Occupied(entry) => { let change_log = entry.into_mut(); if let Some(prev_log) = change_log.0.last() { @@ -803,22 +819,21 @@ impl HummockVersion { // If a table has no new change log entry (even an empty one), it means we have stopped maintained // the change log for the table, and then we will remove the table change log. // The table change log will also be removed when the table id is removed. - self.table_change_log.retain(|table_id, _| { - if version_delta.removed_table_ids.contains(table_id) { + table_change_log.retain(|table_id, _| { + if removed_table_ids.contains(table_id) { return false; } - if let Some(table_info_delta) = version_delta.state_table_info_delta.get(table_id) + if let Some(table_info_delta) = state_table_info_delta.get(table_id) && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch { // the table exists previously, and its committed epoch has progressed. } else { // otherwise, the table change log should be kept anyway return true; } - let contains = version_delta.change_log_delta.contains_key(table_id); + let contains = change_log_delta.contains_key(table_id); if !contains { warn!( ?table_id, - max_committed_epoch = version_delta.visible_table_committed_epoch(), "table change log dropped due to no further change log at newly committed epoch", ); } @@ -826,8 +841,8 @@ impl HummockVersion { }); // truncate the remaining table change log - for (table_id, change_log_delta) in &version_delta.change_log_delta { - if let Some(change_log) = self.table_change_log.get_mut(table_id) { + for (table_id, change_log_delta) in change_log_delta { + if let Some(change_log) = table_change_log.get_mut(table_id) { change_log.truncate(change_log_delta.truncate_epoch); } } diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs new file mode 100644 index 0000000000000..549688054a8ad --- /dev/null +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -0,0 +1,257 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; +use risingwave_pb::hummock::{ + PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog, + StateTableInfoDelta, +}; + +use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon}; +use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; +use crate::{HummockVersionId, INVALID_VERSION_ID}; + +#[derive(Clone, Debug)] +pub struct FrontendHummockVersion { + pub id: HummockVersionId, + pub max_committed_epoch: u64, + pub state_table_info: HummockVersionStateTableInfo, + pub table_change_log: HashMap>, +} + +impl FrontendHummockVersion { + pub fn from_version(version: &HummockVersion) -> Self { + Self { + id: version.id, + max_committed_epoch: version.max_committed_epoch, + state_table_info: version.state_table_info.clone(), + table_change_log: version + .table_change_log + .iter() + .map(|(table_id, change_log)| { + ( + *table_id, + TableChangeLogCommon( + change_log + .0 + .iter() + .map(|change_log| EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: change_log.epochs.clone(), + }) + .collect(), + ), + ) + }) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersion { + PbHummockVersion { + id: self.id.0, + levels: Default::default(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: 0, + table_watermarks: Default::default(), + table_change_logs: self + .table_change_log + .iter() + .map(|(table_id, change_log)| { + ( + table_id.table_id, + PbTableChangeLog { + change_logs: change_log + .0 + .iter() + .map(|change_log| PbEpochNewChangeLog { + old_value: vec![], + new_value: vec![], + epochs: change_log.epochs.clone(), + }) + .collect(), + }, + ) + }) + .collect(), + state_table_info: self.state_table_info.to_protobuf(), + } + } + + pub fn from_protobuf(value: PbHummockVersion) -> Self { + Self { + id: HummockVersionId(value.id), + max_committed_epoch: value.max_committed_epoch, + state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info), + table_change_log: value + .table_change_logs + .into_iter() + .map(|(table_id, change_log)| { + ( + TableId::new(table_id), + TableChangeLogCommon( + change_log + .change_logs + .into_iter() + .map(|change_log| EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: change_log.epochs, + }) + .collect(), + ), + ) + }) + .collect(), + } + } + + pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) { + if self.id != INVALID_VERSION_ID { + assert_eq!(self.id, delta.prev_id); + } + self.id = delta.id; + self.max_committed_epoch = delta.max_committed_epoch; + let (changed_table_info, _) = self + .state_table_info + .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id); + HummockVersion::apply_change_log_delta( + &mut self.table_change_log, + &delta.change_log_delta, + &delta.removed_table_id, + &delta.state_table_info_delta, + &changed_table_info, + ); + } +} + +pub struct FrontendHummockVersionDelta { + pub prev_id: HummockVersionId, + pub id: HummockVersionId, + pub max_committed_epoch: u64, + pub removed_table_id: HashSet, + pub state_table_info_delta: HashMap, + pub change_log_delta: HashMap>, +} + +impl FrontendHummockVersionDelta { + pub fn from_delta(delta: &HummockVersionDelta) -> Self { + Self { + prev_id: delta.prev_id, + id: delta.id, + max_committed_epoch: delta.max_committed_epoch, + removed_table_id: delta.removed_table_ids.clone(), + state_table_info_delta: delta.state_table_info_delta.clone(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, change_log_delta)| { + ( + *table_id, + ChangeLogDeltaCommon { + truncate_epoch: change_log_delta.truncate_epoch, + new_log: change_log_delta.new_log.as_ref().map(|new_log| { + EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: new_log.epochs.clone(), + } + }), + }, + ) + }) + .collect(), + } + } + + pub fn to_protobuf(&self) -> PbHummockVersionDelta { + PbHummockVersionDelta { + id: self.id.to_u64(), + prev_id: self.prev_id.to_u64(), + group_deltas: Default::default(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: 0, + trivial_move: false, + new_table_watermarks: Default::default(), + removed_table_ids: self + .removed_table_id + .iter() + .map(|table_id| table_id.table_id) + .collect(), + change_log_delta: self + .change_log_delta + .iter() + .map(|(table_id, delta)| { + ( + table_id.table_id, + PbChangeLogDelta { + new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { + old_value: vec![], + new_value: vec![], + epochs: new_log.epochs.clone(), + }), + truncate_epoch: delta.truncate_epoch, + }, + ) + }) + .collect(), + state_table_info_delta: self + .state_table_info_delta + .iter() + .map(|(table_id, delta)| (table_id.table_id, *delta)) + .collect(), + } + } + + pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self { + Self { + prev_id: HummockVersionId::new(delta.prev_id), + id: HummockVersionId::new(delta.id), + max_committed_epoch: delta.max_committed_epoch, + removed_table_id: delta + .removed_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + state_table_info_delta: delta + .state_table_info_delta + .into_iter() + .map(|(table_id, delta)| (TableId::new(table_id), delta)) + .collect(), + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, change_log_delta)| { + ( + TableId::new(*table_id), + ChangeLogDeltaCommon { + truncate_epoch: change_log_delta.truncate_epoch, + new_log: change_log_delta.new_log.as_ref().map(|new_log| { + EpochNewChangeLogCommon { + new_value: vec![], + old_value: vec![], + epochs: new_log.epochs.clone(), + } + }), + }, + ) + }) + .collect(), + } + } +} diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 921ab18fcf7cd..b84d2751a50ea 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -51,6 +51,8 @@ pub mod table_stats; pub mod table_watermark; pub mod time_travel; pub mod version; +pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta}; +mod frontend_version; pub use compact::*; use risingwave_common::catalog::TableId; From e4b390cdc74fc18b743f5f11a036abdfaea9e762 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 18 Sep 2024 13:43:10 +0800 Subject: [PATCH 2/3] fix test --- src/frontend/src/scheduler/snapshot.rs | 47 +++++++++++++---------- src/storage/hummock_sdk/src/change_log.rs | 5 ++- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index c4df076826ab2..5d1ad6d69d0b9 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -29,7 +29,6 @@ use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; -use tracing::warn; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; @@ -180,35 +179,41 @@ impl HummockSnapshotManager { } pub fn init(&self, version: FrontendHummockVersion) { - self.worker_sender - .send(Operation::Pin(version.id, version.max_committed_epoch)) - .unwrap(); - // Then set the latest snapshot. - let snapshot = Arc::new(PinnedSnapshot { - value: version, - unpin_sender: self.worker_sender.clone(), - }); - if self.latest_snapshot.send(snapshot).is_err() { - warn!("fail to set init version"); - } + self.update_inner(|_| Some(version)); } /// Update the latest snapshot. /// /// Should only be called by the observer manager. pub fn update(&self, deltas: HummockVersionDeltas) { - self.latest_snapshot.send_if_modified(move |old_snapshot| { + self.update_inner(|old_snapshot| { if deltas.version_deltas.is_empty() { - return false; + return None; } - let snapshot = { - let mut snapshot = old_snapshot.value.clone(); - for delta in deltas.version_deltas { - snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); - } - snapshot - }; + let mut snapshot = old_snapshot.clone(); + for delta in deltas.version_deltas { + snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta)); + } + Some(snapshot) + }) + } + fn update_inner( + &self, + get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option, + ) { + self.latest_snapshot.send_if_modified(move |old_snapshot| { + let new_snapshot = get_new_snapshot(&old_snapshot.value); + let Some(snapshot) = new_snapshot else { + return false; + }; + if snapshot.id <= old_snapshot.value.id { + assert_eq!( + snapshot.id, old_snapshot.value.id, + "receive stale frontend version" + ); + return false; + } // First tell the worker that a new snapshot is going to be pinned. self.worker_sender .send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch)) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 4402c63b2831c..cf3ded58b946e 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -88,7 +88,10 @@ where } impl TableChangeLogCommon { - pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLogCommon] { + pub fn filter_epoch( + &self, + (min_epoch, max_epoch): (u64, u64), + ) -> &[EpochNewChangeLogCommon] { let start = self.0.partition_point(|epoch_change_log| { epoch_change_log.epochs.last().expect("non-empty") < &min_epoch }); From 64b01d52d17d47b6720b57ff504cc6bbe2a60ee5 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 19 Sep 2024 17:05:01 +0800 Subject: [PATCH 3/3] feat(frontend): frontend wait hummock version id for ddl --- proto/ddl_service.proto | 65 ++++--- proto/meta.proto | 2 +- src/frontend/src/catalog/catalog_service.rs | 19 +- src/frontend/src/handler/flush.rs | 4 +- src/frontend/src/meta_client.rs | 4 +- src/frontend/src/scheduler/snapshot.rs | 6 +- src/frontend/src/session.rs | 8 +- src/frontend/src/test_utils.rs | 5 +- src/meta/service/src/stream_service.rs | 4 +- src/meta/src/barrier/mod.rs | 4 + src/meta/src/barrier/schedule.rs | 8 +- src/meta/src/hummock/manager/versioning.rs | 4 + src/meta/src/rpc/ddl_controller.rs | 17 +- src/rpc_client/src/meta_client.rs | 201 +++++++++++++------- 14 files changed, 225 insertions(+), 126 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index f51f0e1b3a428..de860593e8105 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -11,13 +11,18 @@ import "stream_plan.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; +message WaitVersion { + uint64 catalog_version = 1; + uint64 hummock_version_id = 2; +} + message CreateDatabaseRequest { catalog.Database db = 1; } message CreateDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropDatabaseRequest { @@ -26,7 +31,7 @@ message DropDatabaseRequest { message DropDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSchemaRequest { @@ -35,7 +40,7 @@ message CreateSchemaRequest { message CreateSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSchemaRequest { @@ -44,7 +49,7 @@ message DropSchemaRequest { message DropSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSourceRequest { @@ -54,7 +59,7 @@ message CreateSourceRequest { message CreateSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSourceRequest { @@ -64,7 +69,7 @@ message DropSourceRequest { message DropSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterSourceRequest { @@ -73,7 +78,7 @@ message AlterSourceRequest { message AlterSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSinkRequest { @@ -85,7 +90,7 @@ message CreateSinkRequest { message CreateSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSinkRequest { @@ -96,7 +101,7 @@ message DropSinkRequest { message DropSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSubscriptionRequest { @@ -105,7 +110,7 @@ message CreateSubscriptionRequest { message CreateSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSubscriptionRequest { @@ -115,7 +120,7 @@ message DropSubscriptionRequest { message DropSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateMaterializedViewRequest { @@ -135,7 +140,7 @@ message CreateMaterializedViewRequest { message CreateMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropMaterializedViewRequest { @@ -145,7 +150,7 @@ message DropMaterializedViewRequest { message DropMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateViewRequest { @@ -154,7 +159,7 @@ message CreateViewRequest { message CreateViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropViewRequest { @@ -164,7 +169,7 @@ message DropViewRequest { message DropViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // An enum to distinguish different types of the `Table` streaming job. @@ -197,7 +202,7 @@ message CreateTableRequest { message CreateTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterNameRequest { @@ -216,7 +221,7 @@ message AlterNameRequest { message AlterNameResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterOwnerRequest { @@ -247,7 +252,7 @@ message AlterSetSchemaRequest { message AlterSetSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterParallelismRequest { @@ -260,7 +265,7 @@ message AlterParallelismResponse {} message AlterOwnerResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateFunctionRequest { @@ -269,7 +274,7 @@ message CreateFunctionRequest { message CreateFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropFunctionRequest { @@ -278,7 +283,7 @@ message DropFunctionRequest { message DropFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropTableRequest { @@ -291,7 +296,7 @@ message DropTableRequest { message DropTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // Used by risectl (and in the future, dashboard) @@ -310,7 +315,7 @@ message CreateIndexRequest { message CreateIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropIndexRequest { @@ -320,7 +325,7 @@ message DropIndexRequest { message DropIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message ReplaceTablePlan { @@ -345,7 +350,7 @@ message ReplaceTablePlanRequest { message ReplaceTablePlanResponse { common.Status status = 1; // The new global catalog version. - uint64 version = 2; + WaitVersion version = 2; } message GetTableRequest { @@ -378,7 +383,7 @@ message CreateSecretRequest { } message CreateSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message DropSecretRequest { @@ -386,7 +391,7 @@ message DropSecretRequest { } message DropSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message CreateConnectionRequest { @@ -406,7 +411,7 @@ message CreateConnectionRequest { message CreateConnectionResponse { // global catalog version - uint64 version = 1; + WaitVersion version = 1; } message ListConnectionsRequest {} @@ -421,7 +426,7 @@ message DropConnectionRequest { message DropConnectionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message GetTablesRequest { @@ -442,7 +447,7 @@ message CommentOnRequest { message CommentOnResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message TableSchemaChange { diff --git a/proto/meta.proto b/proto/meta.proto index 8a979a17aa45c..d5801eaa15284 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -145,7 +145,7 @@ message FlushRequest { message FlushResponse { common.Status status = 1; - hummock.HummockSnapshot snapshot = 2; + uint64 hummock_version_id = 2; } // The reason why the data sources in the cluster are paused. diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 5f42d1e73e5bb..8f4f0588fb533 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -19,6 +19,7 @@ use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, @@ -26,7 +27,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, TableJobType, + PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -36,6 +37,7 @@ use tokio::sync::watch::Receiver; use super::root_catalog::Catalog; use super::{DatabaseId, SecretId, TableId}; use crate::error::Result; +use crate::scheduler::HummockSnapshotManagerRef; use crate::user::UserId; pub type CatalogReadGuard = ArcRwLockReadGuard; @@ -219,6 +221,7 @@ pub trait CatalogWriter: Send + Sync { pub struct CatalogWriterImpl { meta_client: MetaClient, catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, } #[async_trait::async_trait] @@ -594,18 +597,26 @@ impl CatalogWriter for CatalogWriterImpl { } impl CatalogWriterImpl { - pub fn new(meta_client: MetaClient, catalog_updated_rx: Receiver) -> Self { + pub fn new( + meta_client: MetaClient, + catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, + ) -> Self { Self { meta_client, catalog_updated_rx, + hummock_snapshot_manager, } } - async fn wait_version(&self, version: CatalogVersion) -> Result<()> { + async fn wait_version(&self, version: WaitVersion) -> Result<()> { let mut rx = self.catalog_updated_rx.clone(); - while *rx.borrow_and_update() < version { + while *rx.borrow_and_update() < version.catalog_version { rx.changed().await.map_err(|e| anyhow!(e))?; } + self.hummock_snapshot_manager + .wait(HummockVersionId::new(version.hummock_version_id)) + .await; Ok(()) } } diff --git a/src/frontend/src/handler/flush.rs b/src/frontend/src/handler/flush.rs index 784fbb393c916..58e5a305e8ad4 100644 --- a/src/frontend/src/handler/flush.rs +++ b/src/frontend/src/handler/flush.rs @@ -26,14 +26,14 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result Result<()> { let client = session.env().meta_client(); - let snapshot = client.flush(true).await?; + let version_id = client.flush(true).await?; // Wait for the snapshot to be synchronized, so that future reads in this session can see // previous writes. session .env() .hummock_snapshot_manager() - .wait(snapshot) + .wait(version_id) .await; Ok(()) diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index c58dcc365f431..1f6cfef4d02a0 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -52,7 +52,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn get_snapshot(&self) -> Result; - async fn flush(&self, checkpoint: bool) -> Result; + async fn flush(&self, checkpoint: bool) -> Result; async fn wait(&self) -> Result<()>; @@ -157,7 +157,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.get_snapshot().await } - async fn flush(&self, checkpoint: bool) -> Result { + async fn flush(&self, checkpoint: bool) -> Result { self.0.flush(checkpoint).await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 5d1ad6d69d0b9..ea1741d0b1150 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::{ FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; +use risingwave_pb::hummock::HummockVersionDeltas; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; @@ -229,9 +229,9 @@ impl HummockSnapshotManager { } /// Wait until the latest snapshot is newer than the given one. - pub async fn wait(&self, snapshot: PbHummockSnapshot) { + pub async fn wait(&self, version_id: HummockVersionId) { let mut rx = self.latest_snapshot.subscribe(); - while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch { + while rx.borrow_and_update().value.id < version_id { rx.changed().await.unwrap(); } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index a1150798951cb..2abeeba457a49 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -294,19 +294,21 @@ impl FrontendEnv { let mut join_handles = vec![heartbeat_join_handle]; let mut shutdown_senders = vec![heartbeat_shutdown_sender]; + let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); + let hummock_snapshot_manager = + Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); + let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0); let catalog = Arc::new(RwLock::new(Catalog::default())); let catalog_writer = Arc::new(CatalogWriterImpl::new( meta_client.clone(), catalog_updated_rx, + hummock_snapshot_manager.clone(), )); let catalog_reader = CatalogReader::new(catalog.clone()); let worker_node_manager = Arc::new(WorkerNodeManager::new()); - let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); - let hummock_snapshot_manager = - Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); let compute_client_pool = Arc::new(ComputeClientPool::new( config.batch_exchange_connection_pool_size(), )); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 6123889262155..09fb316c7faea 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -33,6 +33,7 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -946,8 +947,8 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(HummockSnapshot { committed_epoch: 0 }) } - async fn flush(&self, _checkpoint: bool) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) + async fn flush(&self, _checkpoint: bool) -> RpcResult { + Ok(INVALID_VERSION_ID) } async fn wait(&self) -> RpcResult<()> { diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index cfbdda2e96509..6c1591a68acaf 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -69,10 +69,10 @@ impl StreamManagerService for StreamServiceImpl { self.env.idle_manager().record_activity(); let req = request.into_inner(); - let snapshot = self.barrier_scheduler.flush(req.checkpoint).await?; + let version_id = self.barrier_scheduler.flush(req.checkpoint).await?; Ok(Response::new(FlushResponse { status: None, - snapshot: Some(snapshot), + hummock_version_id: version_id.to_u64(), })) } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5c1802adc00a3..bacc2dd607dca 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1309,6 +1309,10 @@ impl GlobalBarrierManagerContext { } } } + + pub fn hummock_manager(&self) -> &HummockManagerRef { + &self.hummock_manager + } } impl CreateMviewProgressTracker { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f213999f0a08f..3e0e16a783a39 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use parking_lot::Mutex; use risingwave_common::catalog::TableId; -use risingwave_pb::hummock::HummockSnapshot; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::meta::PausedReason; use tokio::select; use tokio::sync::{oneshot, watch}; @@ -325,7 +325,7 @@ impl BarrierScheduler { } /// Flush means waiting for the next barrier to collect. - pub async fn flush(&self, checkpoint: bool) -> MetaResult { + pub async fn flush(&self, checkpoint: bool) -> MetaResult { let start = Instant::now(); tracing::debug!("start barrier flush"); @@ -334,8 +334,8 @@ impl BarrierScheduler { let elapsed = Instant::now().duration_since(start); tracing::debug!("barrier flushed in {:?}", elapsed); - let snapshot = self.hummock_manager.latest_snapshot(); - Ok(snapshot) + let version_id = self.hummock_manager.get_version_id().await; + Ok(version_id) } } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 3d621a1d59913..006521038f95e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -158,6 +158,10 @@ impl HummockManager { f(&self.versioning.read().await.current_version) } + pub async fn get_version_id(&self) -> HummockVersionId { + self.on_current_version(|version| version.id).await + } + /// Gets the mapping from table id to compaction group id pub async fn get_table_compaction_group_id_mapping( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 4c1988a37d44c..c2038556d5dd8 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -51,7 +51,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, + alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -278,7 +278,9 @@ impl DdlController { /// has been interrupted during executing, the request will be cancelled by tonic. Since we have /// a lot of logic for revert, status management, notification and so on, ensuring consistency /// would be a huge hassle and pain if we don't spawn here. - pub async fn run_command(&self, command: DdlCommand) -> MetaResult { + /// + /// Though returning `Option`, it's always `Some`, to simplify the handling logic + pub async fn run_command(&self, command: DdlCommand) -> MetaResult> { if !command.allow_in_recovery() { self.barrier_manager.check_status_running()?; } @@ -351,7 +353,16 @@ impl DdlController { } } .in_current_span(); - tokio::spawn(fut).await.unwrap() + let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??; + Ok(Some(WaitVersion { + catalog_version: notification_version, + hummock_version_id: self + .barrier_manager + .hummock_manager() + .get_version_id() + .await + .to_u64(), + })) } pub async fn get_ddl_progress(&self) -> MetaResult> { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 67ea55269b2bd..7bbf041ee75ec 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -26,7 +26,7 @@ use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, SecretId, TableId}; +use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; @@ -171,7 +171,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, req: create_connection_request::Payload, - ) -> Result { + ) -> Result { let request = CreateConnectionRequest { name: connection_name, database_id, @@ -180,7 +180,9 @@ impl MetaClient { payload: Some(req), }; let resp = self.inner.create_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_secret( @@ -190,7 +192,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, value: Vec, - ) -> Result { + ) -> Result { let request = CreateSecretRequest { name: secret_name, database_id, @@ -199,7 +201,9 @@ impl MetaClient { value, }; let resp = self.inner.create_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_connections(&self, _name: Option<&str>) -> Result> { @@ -208,18 +212,22 @@ impl MetaClient { Ok(resp.connections) } - pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { + pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { let request = DropConnectionRequest { connection_id }; let resp = self.inner.drop_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_secret(&self, secret_id: SecretId) -> Result { + pub async fn drop_secret(&self, secret_id: SecretId) -> Result { let request = DropSecretRequest { secret_id: secret_id.into(), }; let resp = self.inner.drop_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } /// Register the current node to the cluster and set the corresponding worker id. @@ -365,27 +373,31 @@ impl MetaClient { Ok(()) } - pub async fn create_database(&self, db: PbDatabase) -> Result { + pub async fn create_database(&self, db: PbDatabase) -> Result { let request = CreateDatabaseRequest { db: Some(db) }; let resp = self.inner.create_database(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_schema(&self, schema: PbSchema) -> Result { + pub async fn create_schema(&self, schema: PbSchema) -> Result { let request = CreateSchemaRequest { schema: Some(schema), }; let resp = self.inner.create_schema(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_materialized_view( &self, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -393,45 +405,53 @@ impl MetaClient { }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_materialized_view( &self, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropMaterializedViewRequest { table_id: table_id.table_id(), cascade, }; let resp = self.inner.drop_materialized_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_source(&self, source: PbSource) -> Result { + pub async fn create_source(&self, source: PbSource) -> Result { let request = CreateSourceRequest { source: Some(source), fragment_graph: None, }; let resp = self.inner.create_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_source_with_graph( &self, source: PbSource, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateSourceRequest { source: Some(source), fragment_graph: Some(graph), }; let resp = self.inner.create_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_sink( @@ -439,7 +459,7 @@ impl MetaClient { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), @@ -447,27 +467,30 @@ impl MetaClient { }; let resp = self.inner.create_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_subscription( - &self, - subscription: PbSubscription, - ) -> Result { + pub async fn create_subscription(&self, subscription: PbSubscription) -> Result { let request = CreateSubscriptionRequest { subscription: Some(subscription), }; let resp = self.inner.create_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_function(&self, function: PbFunction) -> Result { + pub async fn create_function(&self, function: PbFunction) -> Result { let request = CreateFunctionRequest { function: Some(function), }; let resp = self.inner.create_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_table( @@ -476,7 +499,7 @@ impl MetaClient { table: PbTable, graph: StreamFragmentGraph, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = CreateTableRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -485,67 +508,81 @@ impl MetaClient { }; let resp = self.inner.create_table(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn comment_on(&self, comment: PbComment) -> Result { + pub async fn comment_on(&self, comment: PbComment) -> Result { let request = CommentOnRequest { comment: Some(comment), }; let resp = self.inner.comment_on(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_name( &self, object: alter_name_request::Object, name: &str, - ) -> Result { + ) -> Result { let request = AlterNameRequest { object: Some(object), new_name: name.to_string(), }; let resp = self.inner.alter_name(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } // only adding columns is supported - pub async fn alter_source_column(&self, source: PbSource) -> Result { + pub async fn alter_source_column(&self, source: PbSource) -> Result { let request = AlterSourceRequest { source: Some(source), }; let resp = self.inner.alter_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { + pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { let request = AlterOwnerRequest { object: Some(object), owner_id, }; let resp = self.inner.alter_owner(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_set_schema( &self, object: alter_set_schema_request::Object, new_schema_id: u32, - ) -> Result { + ) -> Result { let request = AlterSetSchemaRequest { new_schema_id, object: Some(object), }; let resp = self.inner.alter_set_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source_with_sr(&self, source: PbSource) -> Result { + pub async fn alter_source_with_sr(&self, source: PbSource) -> Result { let request = AlterSourceRequest { source: Some(source), }; let resp = self.inner.alter_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_parallelism( @@ -571,7 +608,7 @@ impl MetaClient { graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = ReplaceTablePlanRequest { plan: Some(ReplaceTablePlan { source, @@ -583,7 +620,9 @@ impl MetaClient { }; let resp = self.inner.replace_table_plan(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> { @@ -594,11 +633,13 @@ impl MetaClient { Ok(()) } - pub async fn create_view(&self, view: PbView) -> Result { + pub async fn create_view(&self, view: PbView) -> Result { let request = CreateViewRequest { view: Some(view) }; let resp = self.inner.create_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_index( @@ -606,7 +647,7 @@ impl MetaClient { index: PbIndex, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateIndexRequest { index: Some(index), index_table: Some(table), @@ -614,7 +655,9 @@ impl MetaClient { }; let resp = self.inner.create_index(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_table( @@ -622,7 +665,7 @@ impl MetaClient { source_id: Option, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropTableRequest { source_id: source_id.map(SourceId::Id), table_id: table_id.table_id(), @@ -630,19 +673,25 @@ impl MetaClient { }; let resp = self.inner.drop_table(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { + pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { let request = DropViewRequest { view_id, cascade }; let resp = self.inner.drop_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { + pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { let request = DropSourceRequest { source_id, cascade }; let resp = self.inner.drop_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_sink( @@ -650,27 +699,31 @@ impl MetaClient { sink_id: u32, cascade: bool, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = DropSinkRequest { sink_id, cascade, affected_table_change, }; let resp = self.inner.drop_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_subscription( &self, subscription_id: u32, cascade: bool, - ) -> Result { + ) -> Result { let request = DropSubscriptionRequest { subscription_id, cascade, }; let resp = self.inner.drop_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_change_log_epochs( @@ -688,33 +741,41 @@ impl MetaClient { Ok(resp.epochs) } - pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { + pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, cascade, }; let resp = self.inner.drop_index(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_function(&self, function_id: FunctionId) -> Result { + pub async fn drop_function(&self, function_id: FunctionId) -> Result { let request = DropFunctionRequest { function_id: function_id.0, }; let resp = self.inner.drop_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_database(&self, database_id: DatabaseId) -> Result { + pub async fn drop_database(&self, database_id: DatabaseId) -> Result { let request = DropDatabaseRequest { database_id }; let resp = self.inner.drop_database(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { + pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { let request = DropSchemaRequest { schema_id }; let resp = self.inner.drop_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } // TODO: using UserInfoVersion instead as return type. @@ -898,10 +959,10 @@ impl MetaClient { Ok(resp.tables) } - pub async fn flush(&self, checkpoint: bool) -> Result { + pub async fn flush(&self, checkpoint: bool) -> Result { let request = FlushRequest { checkpoint }; let resp = self.inner.flush(request).await?; - Ok(resp.snapshot.unwrap()) + Ok(HummockVersionId::new(resp.hummock_version_id)) } pub async fn wait(&self) -> Result<()> {