Skip to content

Commit

Permalink
refactor(storage): add SST retention time sanity check for compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 30, 2024
1 parent e82932f commit 78843c8
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 47 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ message CompactTask {
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
HEARTBEAT_PROGRESS_CANCELED = 14;
RETENTION_TIME_REJECTED = 17;

// for serverless compaction
SERVERLESS_SEND_FAIL_CANCELED = 15;
Expand Down Expand Up @@ -460,6 +461,7 @@ message SubscribeCompactionEventRequest {
uint64 task_id = 4;
CompactTask.TaskStatus task_status = 5;
repeated SstableInfo sorted_output_ssts = 6;
map<uint64, uint64> object_timestamps = 7;
}

// HeartBeat provides the progress status of all tasks on the Compactor.
Expand Down Expand Up @@ -507,6 +509,7 @@ message ReportCompactionTaskRequest {
message ReportTask {
CompactTask compact_task = 2;
map<uint32, TableStats> table_stats_change = 3;
map<uint64, uint64> object_timestamps = 4;
}
// HeartBeat provides the progress status of all tasks on the Compactor.
message HeartBeat {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1668,13 +1668,13 @@ fn collect_resp_info(
let mut old_value_ssts = Vec::with_capacity(resps.len());

for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
let sst_info = local_sst.sst.expect("field not None");
sst_to_worker.insert(sst_info.object_id, resp.worker_id);
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
from_prost_table_stats_map(local_sst.table_stats_map),
local_sst.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl HummockManager {
task_status: TaskStatus::ManualCanceled,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
object_timestamps: HashMap::default(),
});
}
});
Expand Down Expand Up @@ -556,6 +557,7 @@ impl HummockManager {
task_status: TaskStatus::ManualCanceled,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
object_timestamps: HashMap::default(),
});
}
}
Expand Down
24 changes: 18 additions & 6 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use risingwave_hummock_sdk::table_stats::{
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, IntraLevelDelta};
use risingwave_hummock_sdk::{
compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId,
HummockVersionId,
HummockSstableObjectId, HummockVersionId,
};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::subscribe_compaction_event_request::{
Expand Down Expand Up @@ -994,6 +994,7 @@ impl HummockManager {
task_status,
sorted_output_ssts: vec![],
table_stats_change: HashMap::default(),
object_timestamps: HashMap::default(),
})
.collect_vec();
let rets = self.report_compact_tasks(tasks).await?;
Expand Down Expand Up @@ -1108,13 +1109,15 @@ impl HummockManager {
task_status: TaskStatus,
sorted_output_ssts: Vec<SstableInfo>,
table_stats_change: Option<PbTableStatsMap>,
object_timestamps: HashMap<HummockSstableObjectId, u64>,
) -> Result<bool> {
let rets = self
.report_compact_tasks(vec![ReportTask {
task_id,
task_status,
sorted_output_ssts,
table_stats_change: table_stats_change.unwrap_or_default(),
object_timestamps,
}])
.await?;
Ok(rets[0])
Expand All @@ -1129,7 +1132,6 @@ impl HummockManager {
/// or the task is not owned by `context_id` when `context_id` is not None.
pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
// TODO: add sanity check for serverless compaction
let mut guard = self.compaction.write().await;
let deterministic_mode = self.env.opts.compaction_deterministic_test;
let compaction: &mut Compaction = &mut guard;
Expand Down Expand Up @@ -1205,10 +1207,19 @@ impl HummockManager {
.map(|level| level.level_idx)
.collect();
let is_success = if let TaskStatus::Success = compact_task.task_status {
// if member_table_ids changes, the data of sstable may stale.
let is_expired =
Self::is_compact_task_expired(&compact_task, version.latest_version());
if is_expired {
if let Err(e) = self
.report_compaction_sanity_check(&task.object_timestamps)
.await
{
warn!(
"failed to commit compaction task {} {}",
compact_task.task_id,
e.as_report()
);
compact_task.task_status = TaskStatus::RetentionTimeRejected;
false
} else if Self::is_compact_task_expired(&compact_task, version.latest_version()) {
// if member_table_ids changes, the data of sstable may stale.
compact_task.task_status = TaskStatus::InputOutdatedCanceled;
false
} else {
Expand Down Expand Up @@ -1567,6 +1578,7 @@ impl HummockManager {
task_status,
sorted_output_ssts,
table_stats_change: table_stats_change.unwrap_or_default(),
object_timestamps: HashMap::default(),
}])
.await?;
Ok(())
Expand Down
44 changes: 37 additions & 7 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ impl HummockManager {
if !sstables.is_empty() {
// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now().await?;
let sst_retention_watermark =
now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}
check_sst_retention(
now,
self.env.opts.min_sst_retention_time_sec,
sstables
.iter()
.map(|s| (s.sst_info.object_id, s.created_at)),
)?;
}

async {
Expand Down Expand Up @@ -278,6 +278,36 @@ impl HummockManager {
pub async fn release_meta_context(&self) -> Result<()> {
self.release_contexts([META_NODE_ID]).await
}

pub(crate) async fn report_compaction_sanity_check(
&self,
object_timestamps: &HashMap<HummockSstableObjectId, u64>,
) -> Result<()> {
// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if object_timestamps.is_empty() {
return Ok(());
}
let now = self.now().await?;
check_sst_retention(
now,
self.env.opts.min_sst_retention_time_sec,
object_timestamps.iter().map(|(k, v)| (*k, *v)),
)
}
}

fn check_sst_retention(
now: u64,
retention_sec: u64,
sst_infos: impl Iterator<Item = (HummockSstableObjectId, u64)>,
) -> Result<()> {
let sst_retention_watermark = now.saturating_sub(retention_sec);
for (object_id, created_at) in sst_infos {
if created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("object {object_id} is rejected from being committed since it's below watermark: object timestamp {created_at}, meta node timestamp {now}, retention_sec {retention_sec}, watermark {sst_retention_watermark}").into());
}
}
Ok(())
}

// pin and unpin method
Expand Down
43 changes: 37 additions & 6 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ async fn test_hummock_compaction_task() {
// Finish the task and succeed.

assert!(hummock_manager
.report_compact_task(compact_task.task_id, TaskStatus::Success, vec![], None)
.report_compact_task(
compact_task.task_id,
TaskStatus::Success,
vec![],
None,
HashMap::default()
)
.await
.unwrap());
}
Expand Down Expand Up @@ -1383,6 +1389,7 @@ async fn test_version_stats() {
TaskStatus::Success,
vec![],
Some(to_prost_table_stats_map(compact_table_stats_change)),
HashMap::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -1770,12 +1777,19 @@ async fn test_move_state_tables_to_dedicated_compaction_group_trivial_expired()
..Default::default()
}],
None,
HashMap::default(),
)
.await
.unwrap();
assert!(ret);
let ret = hummock_manager
.report_compact_task(task.task_id, TaskStatus::Success, vec![], None)
.report_compact_task(
task.task_id,
TaskStatus::Success,
vec![],
None,
HashMap::default(),
)
.await
.unwrap();
// the task has been canceled
Expand Down Expand Up @@ -1845,7 +1859,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_bottom_l

let compaction_group_id =
get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await;

let manual_compcation_option = ManualCompactionOption {
level: 0,
..Default::default()
Expand Down Expand Up @@ -1990,7 +2003,13 @@ async fn test_compaction_task_expiration_due_to_split_group() {

let version_1 = hummock_manager.get_current_version().await;
assert!(!hummock_manager
.report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None)
.report_compact_task(
compaction_task.task_id,
TaskStatus::Success,
vec![],
None,
HashMap::default()
)
.await
.unwrap());
let version_2 = hummock_manager.get_current_version().await;
Expand All @@ -2005,7 +2024,13 @@ async fn test_compaction_task_expiration_due_to_split_group() {
get_manual_compact_task(hummock_manager.clone(), compaction_group_id, 0).await;
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2);
hummock_manager
.report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None)
.report_compact_task(
compaction_task.task_id,
TaskStatus::Success,
vec![],
None,
HashMap::default(),
)
.await
.unwrap();

Expand Down Expand Up @@ -2254,7 +2279,13 @@ async fn test_partition_level() {
sst.sst_size = sst.file_size;
global_sst_id += 1;
let ret = hummock_manager
.report_compact_task(task.task_id, TaskStatus::Success, vec![sst], None)
.report_compact_task(
task.task_id,
TaskStatus::Success,
vec![sst],
None,
HashMap::default(),
)
.await
.unwrap();
assert!(ret);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ impl HummockMetaClient for MockHummockMetaClient {
task_status,
sorted_output_ssts,
table_stats_change,
object_timestamps,
}) = item.event.unwrap()
{
if let Err(e) = hummock_manager_compact
Expand All @@ -357,6 +358,7 @@ impl HummockMetaClient for MockHummockMetaClient {
.map(SstableInfo::from)
.collect_vec(),
Some(table_stats_change),
object_timestamps,
)
.await
{
Expand Down
4 changes: 4 additions & 0 deletions src/storage/hummock_sdk/src/compact_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::key_range::KeyRange;
use crate::level::InputLevel;
use crate::sstable_info::SstableInfo;
use crate::table_watermark::TableWatermarks;
use crate::HummockSstableObjectId;

#[derive(Clone, PartialEq, Default, Debug)]
pub struct CompactTask {
Expand Down Expand Up @@ -373,6 +374,7 @@ pub struct ReportTask {
pub task_id: u64,
pub task_status: TaskStatus,
pub sorted_output_ssts: Vec<SstableInfo>,
pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
}

impl From<PbReportTask> for ReportTask {
Expand All @@ -386,6 +388,7 @@ impl From<PbReportTask> for ReportTask {
.into_iter()
.map(SstableInfo::from)
.collect_vec(),
object_timestamps: value.object_timestamps,
}
}
}
Expand All @@ -401,6 +404,7 @@ impl From<ReportTask> for PbReportTask {
.into_iter()
.map(|sst| sst.into())
.collect_vec(),
object_timestamps: value.object_timestamps,
}
}
}
Loading

0 comments on commit 78843c8

Please sign in to comment.