Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): add SST sanity check during commit epoch #18757

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@
map<uint64, uint32> sst_id_to_worker_id = 2;
uint64 epoch = 3;
}

Check failure on line 538 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "sst_retention_watermark" on message "FullScanTask" changed option "json_name" from "sstRetentionTimeSec" to "sstRetentionWatermark".

Check failure on line 538 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "FullScanTask" changed name from "sst_retention_time_sec" to "sst_retention_watermark".
// Delete SSTs in object store
message VacuumTask {
repeated uint64 sstable_object_ids = 1;
Expand All @@ -543,7 +543,7 @@

// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
uint64 sst_retention_watermark = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark is meta node's now - SST retention sec.
Previously it's calculated in compute node, which relies on compute node's local clock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: the reason why this change is compatible is because the previous sst_retention_time_sec must be way smaller than sst_retention_watermark. That means:

  • If meta is upgraded before compactor, compactor will be less aggressive to delete objects because now - sst_retention_watermark (using old logic to interpret the new field) is way smaller.
  • If compactor is upgraded before meta, compactor will also be less aggressive to delete objects because sst_retention_time_sec (using new logic to interpret the old field) is way smaller

optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
message LocalSstableInfo {
Copy link
Contributor Author

@zwang28 zwang28 Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SSTs to commit are no longer grouped, I make the pb type name consistent with the corresponding type in mem.

reserved 1;
reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map<uint32, hummock.TableStats> table_stats_map = 3;
uint64 created_at = 4;
}
repeated GroupedSstableInfo synced_sstables = 4;
repeated LocalSstableInfo synced_sstables = 4;

Check failure on line 42 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "synced_sstables" on message "BarrierCompleteResponse" changed type from "stream_service.BarrierCompleteResponse.GroupedSstableInfo" to "stream_service.BarrierCompleteResponse.LocalSstableInfo".
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/hummock_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub const COMPACTION_TASK_ID: &str = "compaction_task";
pub const COMPACTION_GROUP_ID: &str = "compaction_group";
pub const SSTABLE_OBJECT_ID: &str = "sstable_object";
pub const META_BACKUP_ID: &str = "meta_backup";
/// The read & write of now is different from other sequences. It merely reuses the hummock_sequence table.
pub const HUMMOCK_NOW: &str = "now";

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)]
#[sea_orm(table_name = "hummock_sequence")]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<TriggerFullGcResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)?;
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)
.await?;
Ok(Response::new(TriggerFullGcResponse { status: None }))
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ fn collect_resp_info(
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ impl HummockManager {
/// or the task is not owned by `context_id` when `context_id` is not None.

pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
// TODO: add sanity check for serverless compaction
let mut guard = self.compaction.write().await;
let deterministic_mode = self.env.opts.compaction_deterministic_test;
let compaction: &mut Compaction = &mut guard;
Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ impl HummockManager {
}
}

// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now().await?;
let sst_retention_watermark =
now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}
}

async {
if !self.env.opts.enable_committed_sst_sanity_check {
return;
Expand Down
77 changes: 61 additions & 16 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use std::cmp;
use std::collections::HashSet;
use std::ops::Bound::{Excluded, Included};
use std::ops::DerefMut;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use anyhow::Context;
use futures::{stream, StreamExt};
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_meta_model_migration::OnConflict;
use risingwave_meta_model_v2::hummock_sequence;
use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;
use sea_orm::{ActiveValue, EntityTrait};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::commit_multi_var;
Expand Down Expand Up @@ -177,13 +181,13 @@ impl HummockManager {
/// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`.
///
/// Returns Ok(false) if there is no worker available.
pub fn start_full_gc(
pub async fn start_full_gc(
&self,
sst_retention_time: Duration,
prefix: Option<String>,
) -> Result<bool> {
self.metrics.full_gc_trigger_count.inc();
// Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op.
// Set a minimum sst_retention_time.
let sst_retention_time = cmp::max(
sst_retention_time,
Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
Expand All @@ -205,9 +209,13 @@ impl HummockManager {
}
Some(compactor) => compactor,
};
let sst_retention_watermark = self
.now()
.await?
.saturating_sub(sst_retention_time.as_secs());
compactor
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
sst_retention_watermark,
prefix,
start_after,
limit,
Expand Down Expand Up @@ -262,6 +270,50 @@ impl HummockManager {
tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
}

pub async fn now(&self) -> Result<u64> {
let mut guard = self.now.lock().await;
let new_now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
if new_now < *guard {
return Err(anyhow::anyhow!(format!(
"unexpected decreasing now, old={}, new={}",
*guard, new_now
))
.into());
}
*guard = new_now;
drop(guard);
// Persist now to maintain non-decreasing even after a meta node reboot.
if let Some(sql) = self.sql_store() {
let m = hummock_sequence::ActiveModel {
name: ActiveValue::Set(HUMMOCK_NOW.into()),
seq: ActiveValue::Set(new_now.try_into().unwrap()),
};
hummock_sequence::Entity::insert(m)
.on_conflict(
OnConflict::column(hummock_sequence::Column::Name)
.update_column(hummock_sequence::Column::Seq)
.to_owned(),
)
.exec(&sql.conn)
.await?;
}
Ok(new_now)
}

pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
let Some(sql) = self.sql_store() else {
return Ok(None);
};
let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_string())
.one(&sql.conn)
.await?
.map(|m| m.seq.try_into().unwrap());
Ok(now)
}
}

pub struct FullGcState {
Expand Down Expand Up @@ -396,6 +448,7 @@ mod tests {
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,),
None
)
.await
.unwrap());

let mut receiver = compactor_manager.add_compactor(context_id);
Expand All @@ -405,36 +458,28 @@ mod tests {
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1),
None
)
.await
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec,
full_scan_task.sst_retention_time_sec
);

assert!(hummock_manager
.start_full_gc(
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
None
)
.await
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec doesn't override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec + 1,
full_scan_task.sst_retention_time_sec
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use risingwave_pb::hummock::{
};
use risingwave_pb::meta::subscribe_response::Operation;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tonic::Streaming;

use crate::hummock::compaction::CompactStatus;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
now: Mutex<u64>,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -287,6 +289,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand All @@ -304,6 +307,9 @@ impl HummockManager {

/// Load state from meta store.
async fn load_meta_store_state(&self) -> Result<()> {
let now = self.load_now().await?;
*self.now.lock().await = now.unwrap_or(0);

let mut compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let mut context_info_guard = self.context_info.write().await;
Expand Down
Loading
Loading