Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): add SST sanity check during commit epoch #18757

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@
}

message ReportCompactionTaskResponse {
common.Status status = 1;

Check failure on line 530 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "sst_retention_watermark" on message "FullScanTask" changed option "json_name" from "sstRetentionTimeSec" to "sstRetentionWatermark".

Check failure on line 530 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "FullScanTask" changed name from "sst_retention_time_sec" to "sst_retention_watermark".
}

message ValidationTask {
Expand All @@ -543,7 +543,7 @@

// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
uint64 sst_retention_watermark = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark is meta node's now - SST retention sec.
Previously it's calculated in compute node, which relies on compute node's local clock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: the reason why this change is compatible is because the previous sst_retention_time_sec must be way smaller than sst_retention_watermark. That means:

  • If meta is upgraded before compactor, compactor will be less aggressive to delete objects because now - sst_retention_watermark (using old logic to interpret the new field) is way smaller.
  • If compactor is upgraded before meta, compactor will also be less aggressive to delete objects because sst_retention_time_sec (using new logic to interpret the old field) is way smaller

optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
message LocalSstableInfo {
Copy link
Contributor Author

@zwang28 zwang28 Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SSTs to commit are no longer grouped, I make the pb type name consistent with the corresponding type in mem.

reserved 1;
reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map<uint32, hummock.TableStats> table_stats_map = 3;
uint64 created_at = 4;
}
repeated GroupedSstableInfo synced_sstables = 4;
repeated LocalSstableInfo synced_sstables = 4;

Check failure on line 42 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "synced_sstables" on message "BarrierCompleteResponse" changed type from "stream_service.BarrierCompleteResponse.GroupedSstableInfo" to "stream_service.BarrierCompleteResponse.LocalSstableInfo".
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ fn collect_resp_info(
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
11 changes: 11 additions & 0 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ impl HummockManager {
}
}

// TODO: add sanity check for serverless compaction

// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now();
let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}

async {
if !self.env.opts.enable_committed_sst_sanity_check {
return;
Expand Down
31 changes: 17 additions & 14 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl HummockManager {
prefix: Option<String>,
) -> Result<bool> {
self.metrics.full_gc_trigger_count.inc();
// Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op.
// Set a minimum sst_retention_time.
let sst_retention_time = cmp::max(
sst_retention_time,
Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
Expand All @@ -205,9 +205,10 @@ impl HummockManager {
}
Some(compactor) => compactor,
};
let sst_retention_watermark = self.now().saturating_sub(sst_retention_time.as_secs());
compactor
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
sst_retention_watermark,
prefix,
start_after,
limit,
Expand Down Expand Up @@ -262,6 +263,18 @@ impl HummockManager {
tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
}

pub fn now(&self) -> u64 {
// TODO: persist now to maintain non-decreasing even after a meta node reboot.
let mut guard = self.now.lock();
let new_now = chrono::Utc::now().timestamp().try_into().unwrap();
if new_now < *guard {
tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now");
return *guard;
}
*guard = new_now;
new_now
}
}

pub struct FullGcState {
Expand Down Expand Up @@ -406,35 +419,25 @@ mod tests {
None
)
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec,
full_scan_task.sst_retention_time_sec
);

assert!(hummock_manager
.start_full_gc(
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
None
)
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec doesn't override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec + 1,
full_scan_task.sst_retention_time_sec
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::monitor::MonitoredRwLock;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::INVALID_EPOCH;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
now: Mutex<u64>,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -287,6 +289,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
38 changes: 38 additions & 0 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ fn gen_local_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> Local
LocalSstableInfo {
sst_info: gen_sstable_info(sst_id, table_ids, epoch),
table_stats: Default::default(),
created_at: u64::MAX,
}
}
fn get_compaction_group_object_ids(
Expand Down Expand Up @@ -794,6 +795,31 @@ async fn test_invalid_sst_id() {
);
}

// reject due to SST's timestamp is below watermark
let ssts_below_watermerk = ssts
.iter()
.map(|s| LocalSstableInfo {
sst_info: s.sst_info.clone(),
table_stats: s.table_stats.clone(),
created_at: 0,
})
.collect();
let error = hummock_meta_client
.commit_epoch(
epoch,
SyncResult {
uncommitted_ssts: ssts_below_watermerk,
..Default::default()
},
false,
)
.await
.unwrap_err();
assert!(error
.as_report()
.to_string()
.contains("is rejected from being committed since it's below watermark"));

hummock_meta_client
.commit_epoch(
epoch,
Expand Down Expand Up @@ -1275,6 +1301,7 @@ async fn test_version_stats() {
.iter()
.map(|table_id| (*table_id, table_stats_change.clone()))
.collect(),
created_at: u64::MAX,
})
.collect_vec();
hummock_meta_client
Expand Down Expand Up @@ -1394,6 +1421,7 @@ async fn test_split_compaction_group_on_commit() {
},
),
]),
created_at: u64::MAX,
};
hummock_meta_client
.commit_epoch(
Expand Down Expand Up @@ -1489,6 +1517,7 @@ async fn test_split_compaction_group_on_demand_basic() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
let sst_2 = LocalSstableInfo {
sst_info: SstableInfo {
Expand All @@ -1507,6 +1536,7 @@ async fn test_split_compaction_group_on_demand_basic() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
hummock_meta_client
.commit_epoch(
Expand Down Expand Up @@ -1598,6 +1628,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
hummock_manager
.register_table_ids_for_test(&[(100, 2), (101, 2)])
Expand Down Expand Up @@ -1694,6 +1725,7 @@ async fn test_split_compaction_group_trivial_expired() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
let sst_2 = LocalSstableInfo {
sst_info: SstableInfo {
Expand All @@ -1712,6 +1744,7 @@ async fn test_split_compaction_group_trivial_expired() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
let mut sst_3 = sst_2.clone();
let mut sst_4 = sst_1.clone();
Expand Down Expand Up @@ -1857,6 +1890,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
hummock_meta_client
.commit_epoch(
Expand Down Expand Up @@ -2019,6 +2053,7 @@ async fn test_compaction_task_expiration_due_to_split_group() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
let sst_2 = LocalSstableInfo {
sst_info: SstableInfo {
Expand All @@ -2037,6 +2072,7 @@ async fn test_compaction_task_expiration_due_to_split_group() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};

hummock_meta_client
Expand Down Expand Up @@ -2379,6 +2415,7 @@ async fn test_unregister_moved_table() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};
let sst_2 = LocalSstableInfo {
sst_info: SstableInfo {
Expand All @@ -2397,6 +2434,7 @@ async fn test_unregister_moved_table() {
..Default::default()
},
table_stats: Default::default(),
created_at: u64::MAX,
};

hummock_meta_client
Expand Down
5 changes: 4 additions & 1 deletion src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,23 @@ pub struct SyncResult {
pub struct LocalSstableInfo {
pub sst_info: SstableInfo,
pub table_stats: TableStatsMap,
pub created_at: u64,
}

impl LocalSstableInfo {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap) -> Self {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
Self {
sst_info,
table_stats,
created_at,
}
}

pub fn for_test(sst_info: SstableInfo) -> Self {
Self {
sst_info,
table_stats: Default::default(),
created_at: u64::MAX,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,7 @@ async fn test_commit_multi_epoch() {
})
.collect(),
sst_info: sst,
created_at: u64::MAX,
}],
new_table_fragment_info,
change_log_delta: Default::default(),
Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/vacuum_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn test_full_scan() {
let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok)));

let task = FullScanTask {
sst_retention_time_sec: 10000,
sst_retention_watermark: 0,
prefix: None,
start_after: None,
limit: None,
Expand All @@ -102,7 +102,7 @@ async fn test_full_scan() {
assert!(scan_result.is_empty());

let task = FullScanTask {
sst_retention_time_sec: 6000,
sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(),
prefix: None,
start_after: None,
limit: None,
Expand All @@ -113,7 +113,7 @@ async fn test_full_scan() {
assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]);

let task = FullScanTask {
sst_retention_time_sec: 2000,
sst_retention_watermark: u64::MAX,
prefix: None,
start_after: None,
limit: None,
Expand Down
15 changes: 14 additions & 1 deletion src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::SystemTime;

use bytes::{Bytes, BytesMut};
use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN};
Expand Down Expand Up @@ -540,8 +541,20 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
}

let writer_output = self.writer.finish(meta).await?;
// The timestamp is only used during full GC.
//
// Ideally object store object's last_modified should be used.
// However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
//
// The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
// It should help alleviate the clock drift issue.

let now = SystemTime::now()
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
Ok(SstableBuilderOutput::<W::Output> {
sst_info: LocalSstableInfo::new(sst_info, self.table_stats),
sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
writer_output,
stats: SstableBuilderOutputStats {
bloom_filter_size,
Expand Down
Loading
Loading