Skip to content

Commit

Permalink
refactor(storage): proactively prevent uncommitted SSTs from GC
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Oct 12, 2024
1 parent d6dc650 commit cfb1b6c
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 9 deletions.
6 changes: 6 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ message StreamingControlStreamResponse {
}
}

message GetMinUncommittedSstIdRequest {}
message GetMinUncommittedSstIdResponse {
uint64 min_uncommitted_sst_id = 1;
}

service StreamService {
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
rpc GetMinUncommittedSstId(GetMinUncommittedSstIdRequest) returns (GetMinUncommittedSstIdResponse);
}

// TODO: Lifecycle management for actors.
19 changes: 19 additions & 0 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
Expand Down Expand Up @@ -87,4 +88,22 @@ impl StreamService for StreamServiceImpl {
self.mgr.handle_new_control_stream(tx, stream, init_request);
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}

async fn get_min_uncommitted_sst_id(
&self,
_request: Request<GetMinUncommittedSstIdRequest>,
) -> Result<Response<GetMinUncommittedSstIdResponse>, Status> {
let min_uncommitted_sst_id = if let Some(hummock) = self.mgr.env.state_store().as_hummock()
{
hummock
.min_uncommitted_sst_id()
.await
.unwrap_or(HummockSstableObjectId::MAX)
} else {
HummockSstableObjectId::MAX
};
Ok(Response::new(GetMinUncommittedSstIdResponse {
min_uncommitted_sst_id,
}))
}
}
3 changes: 2 additions & 1 deletion src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ impl HummockManager {

// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// sanity check to ensure SSTs to commit have not been full GCed yet.
// Sanity check to ensure SSTs to commit have not been full GCed yet.
// TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed.
let now = self.now().await?;
check_sst_retention(
now,
Expand Down
54 changes: 50 additions & 4 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::DerefMut;
use std::time::{Duration, SystemTime};

use futures::future::try_join_all;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_hummock_sdk::HummockSstableObjectId;
Expand All @@ -26,11 +27,14 @@ use risingwave_meta_model_v2::hummock_sequence;
use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;
use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest;
use risingwave_rpc_client::StreamClientPool;
use sea_orm::{ActiveValue, EntityTrait};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::commit_multi_var;
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::BTreeMapTransaction;

#[derive(Default)]
Expand Down Expand Up @@ -226,6 +230,12 @@ impl HummockManager {
object_ids: Vec<HummockSstableObjectId>,
next_start_after: Option<String>,
) -> Result<usize> {
// It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`).
// Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`.
// By getting `min_sst_id` after `object_ids`, it's ensured `object_ids` won't include any SSTs from those new compute nodes.
let min_sst_id =
collect_min_uncommitted_sst_id(&self.metadata_manager, self.env.stream_client_pool())
.await?;
self.full_gc_state.set_next_start_after(next_start_after);
if object_ids.is_empty() {
tracing::info!("SST full scan returns no SSTs.");
Expand All @@ -243,19 +253,31 @@ impl HummockManager {
self.metrics
.time_travel_object_count
.set(pinned_object_ids.len() as _);
// filter by SST id watermark, i.e. minimum id of uncommitted SSTs reported by compute nodes.
let object_ids = object_ids
.into_iter()
.filter(|id| *id < min_sst_id)
.collect_vec();
let after_min_sst_id = object_ids.len();
// filter by time travel archive
let object_ids = object_ids
.into_iter()
.filter(|s| !pinned_object_ids.contains(s))
.collect_vec();
let after_time_travel = object_ids.len();
// filter by version
let selected_object_number = self.extend_objects_to_delete_from_scan(&object_ids).await;
let after_version = self.extend_objects_to_delete_from_scan(&object_ids).await;
metrics
.full_gc_selected_object_count
.observe(selected_object_number as _);
tracing::info!("Object full scan returns {candidate_object_number} objects. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
.observe(after_version as _);
tracing::info!(
candidate_object_number,
after_min_sst_id,
after_time_travel,
after_version,
"complete full gc"
);
Ok(after_version)
}

pub async fn now(&self) -> Result<u64> {
Expand Down Expand Up @@ -329,6 +351,30 @@ impl HummockManager {
}
}

async fn collect_min_uncommitted_sst_id(
metadata_manager: &MetadataManager,
client_pool: &StreamClientPool,
) -> Result<HummockSstableObjectId> {
let futures = metadata_manager
.list_active_streaming_compute_nodes()
.await
.map_err(|err| Error::MetaStore(err.into()))?
.into_iter()
.map(|worker_node| async move {
let client = client_pool.get(&worker_node).await?;
let request = GetMinUncommittedSstIdRequest {};
client.get_min_uncommitted_sst_id(request).await
});
let min_watermark = try_join_all(futures)
.await
.map_err(|err| Error::Internal(err.into()))?
.into_iter()
.map(|resp| resp.min_uncommitted_sst_id)
.min()
.unwrap_or(HummockSstableObjectId::MAX);
Ok(min_watermark)
}

pub struct FullGcState {
next_start_after: Mutex<Option<String>>,
limit: Option<u64>,
Expand Down
3 changes: 2 additions & 1 deletion src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ pub type StreamClientPoolRef = Arc<StreamClientPool>;
macro_rules! for_all_stream_rpc {
($macro:ident) => {
$macro! {
{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse },
{ 0, get_min_uncommitted_sst_id, GetMinUncommittedSstIdRequest, GetMinUncommittedSstIdResponse }
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ impl HummockEventHandler {
self.uploader.may_destroy_instance(instance_id);
self.destroy_read_version(instance_id);
}
HummockEvent::GetMinUncommittedSstId { result_tx } => {
let _ = result_tx
.send(self.uploader.min_uncommitted_sst_id())
.inspect_err(|e| {
error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
});
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, HummockVersionId};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -105,6 +105,10 @@ pub enum HummockEvent {
DestroyReadVersion {
instance_id: LocalInstanceId,
},

GetMinUncommittedSstId {
result_tx: oneshot::Sender<Option<HummockSstableObjectId>>,
},
}

impl HummockEvent {
Expand Down Expand Up @@ -164,6 +168,7 @@ impl HummockEvent {

#[cfg(any(test, feature = "test"))]
HummockEvent::FlushEvent(_) => "FlushEvent".to_string(),
HummockEvent::GetMinUncommittedSstId { .. } => "GetMinSpilledSstId".to_string(),
}
}
}
Expand Down
23 changes: 22 additions & 1 deletion src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_common::must_match;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
use task_manager::{TaskManager, UploadingTaskStatus};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -1089,6 +1089,19 @@ impl UploaderData {
send_sync_result(syncing_data.sync_result_sender, Err(err()));
}
}

fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
self.spilled_data
.values()
.filter_map(|(s, _)| {
s.sstable_infos()
.iter()
.chain(s.old_value_sstable_infos())
.map(|s| s.sst_info.sst_id)
.min()
})
.min()
}
}

struct ErrState {
Expand Down Expand Up @@ -1329,6 +1342,14 @@ impl HummockUploader {
}
data.check_upload_task_consistency();
}

pub(crate) fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
if let UploaderState::Working(ref u) = self.state {
u.min_uncommitted_sst_id()
} else {
None
}
}
}

impl UploaderData {
Expand Down
10 changes: 9 additions & 1 deletion src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::key::{
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, HummockVersionId};
use risingwave_rpc_client::HummockMetaClient;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
Expand Down Expand Up @@ -553,6 +553,14 @@ impl HummockStorage {
pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
self.compact_await_tree_reg.as_ref()
}

pub async fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
let (tx, rx) = oneshot::channel();
self.hummock_event_sender
.send(HummockEvent::GetMinUncommittedSstId { result_tx: tx })
.expect("should send success");
rx.await.expect("should await success")
}
}

impl StateStoreRead for HummockStorage {
Expand Down

0 comments on commit cfb1b6c

Please sign in to comment.