From cfb1b6cc2007c76034c379398d47fdee218237d1 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 12 Oct 2024 12:04:08 +0800 Subject: [PATCH] refactor(storage): proactively prevent uncommitted SSTs from GC --- proto/stream_service.proto | 6 +++ src/compute/src/rpc/service/stream_service.rs | 19 +++++++ src/meta/src/hummock/manager/context.rs | 3 +- src/meta/src/hummock/manager/gc.rs | 54 +++++++++++++++++-- src/rpc_client/src/stream_client.rs | 3 +- .../event_handler/hummock_event_handler.rs | 7 +++ src/storage/src/hummock/event_handler/mod.rs | 7 ++- .../src/hummock/event_handler/uploader/mod.rs | 23 +++++++- .../src/hummock/store/hummock_storage.rs | 10 +++- 9 files changed, 123 insertions(+), 9 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 45703554c2367..e98d1fdaed754 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -85,9 +85,15 @@ message StreamingControlStreamResponse { } } +message GetMinUncommittedSstIdRequest {} +message GetMinUncommittedSstIdResponse { + uint64 min_uncommitted_sst_id = 1; +} + service StreamService { rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); + rpc GetMinUncommittedSstId(GetMinUncommittedSstIdRequest) returns (GetMinUncommittedSstIdResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index e8d403a9693cc..dae132f352c72 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -14,6 +14,7 @@ use await_tree::InstrumentAwait; use futures::{Stream, StreamExt, TryStreamExt}; +use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; @@ -87,4 +88,22 @@ impl StreamService for StreamServiceImpl { self.mgr.handle_new_control_stream(tx, stream, init_request); Ok(Response::new(UnboundedReceiverStream::new(rx))) } + + async fn get_min_uncommitted_sst_id( + &self, + _request: Request, + ) -> Result, Status> { + let min_uncommitted_sst_id = if let Some(hummock) = self.mgr.env.state_store().as_hummock() + { + hummock + .min_uncommitted_sst_id() + .await + .unwrap_or(HummockSstableObjectId::MAX) + } else { + HummockSstableObjectId::MAX + }; + Ok(Response::new(GetMinUncommittedSstIdResponse { + min_uncommitted_sst_id, + })) + } } diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 3ceb9d367f7df..f234e7475b1fe 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -231,7 +231,8 @@ impl HummockManager { // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible. if !sstables.is_empty() { - // sanity check to ensure SSTs to commit have not been full GCed yet. + // Sanity check to ensure SSTs to commit have not been full GCed yet. + // TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed. let now = self.now().await?; check_sst_retention( now, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index bfa3bd0dbddcb..51663d1cea3f1 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -18,6 +18,7 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::DerefMut; use std::time::{Duration, SystemTime}; +use futures::future::try_join_all; use itertools::Itertools; use parking_lot::Mutex; use risingwave_hummock_sdk::HummockSstableObjectId; @@ -26,11 +27,14 @@ use risingwave_meta_model_v2::hummock_sequence; use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; +use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest; +use risingwave_rpc_client::StreamClientPool; use sea_orm::{ActiveValue, EntityTrait}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::commit_multi_var; use crate::hummock::HummockManager; +use crate::manager::MetadataManager; use crate::model::BTreeMapTransaction; #[derive(Default)] @@ -226,6 +230,12 @@ impl HummockManager { object_ids: Vec, next_start_after: Option, ) -> Result { + // It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`). + // Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`. + // By getting `min_sst_id` after `object_ids`, it's ensured `object_ids` won't include any SSTs from those new compute nodes. + let min_sst_id = + collect_min_uncommitted_sst_id(&self.metadata_manager, self.env.stream_client_pool()) + .await?; self.full_gc_state.set_next_start_after(next_start_after); if object_ids.is_empty() { tracing::info!("SST full scan returns no SSTs."); @@ -243,6 +253,12 @@ impl HummockManager { self.metrics .time_travel_object_count .set(pinned_object_ids.len() as _); + // filter by SST id watermark, i.e. minimum id of uncommitted SSTs reported by compute nodes. + let object_ids = object_ids + .into_iter() + .filter(|id| *id < min_sst_id) + .collect_vec(); + let after_min_sst_id = object_ids.len(); // filter by time travel archive let object_ids = object_ids .into_iter() @@ -250,12 +266,18 @@ impl HummockManager { .collect_vec(); let after_time_travel = object_ids.len(); // filter by version - let selected_object_number = self.extend_objects_to_delete_from_scan(&object_ids).await; + let after_version = self.extend_objects_to_delete_from_scan(&object_ids).await; metrics .full_gc_selected_object_count - .observe(selected_object_number as _); - tracing::info!("Object full scan returns {candidate_object_number} objects. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version."); - Ok(selected_object_number) + .observe(after_version as _); + tracing::info!( + candidate_object_number, + after_min_sst_id, + after_time_travel, + after_version, + "complete full gc" + ); + Ok(after_version) } pub async fn now(&self) -> Result { @@ -329,6 +351,30 @@ impl HummockManager { } } +async fn collect_min_uncommitted_sst_id( + metadata_manager: &MetadataManager, + client_pool: &StreamClientPool, +) -> Result { + let futures = metadata_manager + .list_active_streaming_compute_nodes() + .await + .map_err(|err| Error::MetaStore(err.into()))? + .into_iter() + .map(|worker_node| async move { + let client = client_pool.get(&worker_node).await?; + let request = GetMinUncommittedSstIdRequest {}; + client.get_min_uncommitted_sst_id(request).await + }); + let min_watermark = try_join_all(futures) + .await + .map_err(|err| Error::Internal(err.into()))? + .into_iter() + .map(|resp| resp.min_uncommitted_sst_id) + .min() + .unwrap_or(HummockSstableObjectId::MAX); + Ok(min_watermark) +} + pub struct FullGcState { next_start_after: Mutex>, limit: Option, diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index dcd4a8edbf729..42eeaa780d099 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -73,7 +73,8 @@ pub type StreamClientPoolRef = Arc; macro_rules! for_all_stream_rpc { ($macro:ident) => { $macro! { - { 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } + { 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }, + { 0, get_min_uncommitted_sst_id, GetMinUncommittedSstIdRequest, GetMinUncommittedSstIdResponse } } }; } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 7744b102761de..7a33ed81b4373 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -796,6 +796,13 @@ impl HummockEventHandler { self.uploader.may_destroy_instance(instance_id); self.destroy_read_version(instance_id); } + HummockEvent::GetMinUncommittedSstId { result_tx } => { + let _ = result_tx + .send(self.uploader.min_uncommitted_sst_id()) + .inspect_err(|e| { + error!("unable to send get_min_uncommitted_sst_id result: {:?}", e); + }); + } } } diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 60f2e0c02d07e..f0a4b2a899874 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, HummockVersionId}; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -105,6 +105,10 @@ pub enum HummockEvent { DestroyReadVersion { instance_id: LocalInstanceId, }, + + GetMinUncommittedSstId { + result_tx: oneshot::Sender>, + }, } impl HummockEvent { @@ -164,6 +168,7 @@ impl HummockEvent { #[cfg(any(test, feature = "test"))] HummockEvent::FlushEvent(_) => "FlushEvent".to_string(), + HummockEvent::GetMinUncommittedSstId { .. } => "GetMinSpilledSstId".to_string(), } } } diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index cad5708aa831e..519e79e0708e6 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -36,7 +36,7 @@ use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo}; use task_manager::{TaskManager, UploadingTaskStatus}; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -1089,6 +1089,19 @@ impl UploaderData { send_sync_result(syncing_data.sync_result_sender, Err(err())); } } + + fn min_uncommitted_sst_id(&self) -> Option { + self.spilled_data + .values() + .filter_map(|(s, _)| { + s.sstable_infos() + .iter() + .chain(s.old_value_sstable_infos()) + .map(|s| s.sst_info.sst_id) + .min() + }) + .min() + } } struct ErrState { @@ -1329,6 +1342,14 @@ impl HummockUploader { } data.check_upload_task_consistency(); } + + pub(crate) fn min_uncommitted_sst_id(&self) -> Option { + if let UploaderState::Working(ref u) = self.state { + u.min_uncommitted_sst_id() + } else { + None + } + } } impl UploaderData { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7d77554900f22..0c947309a6b06 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, HummockVersionId}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -553,6 +553,14 @@ impl HummockStorage { pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> { self.compact_await_tree_reg.as_ref() } + + pub async fn min_uncommitted_sst_id(&self) -> Option { + let (tx, rx) = oneshot::channel(); + self.hummock_event_sender + .send(HummockEvent::GetMinUncommittedSstId { result_tx: tx }) + .expect("should send success"); + rx.await.expect("should await success") + } } impl StateStoreRead for HummockStorage {