Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): add SST sanity check during commit epoch #18757

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@
}

message ReportCompactionTaskResponse {
common.Status status = 1;

Check failure on line 530 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "sst_retention_watermark" on message "FullScanTask" changed option "json_name" from "sstRetentionTimeSec" to "sstRetentionWatermark".

Check failure on line 530 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "FullScanTask" changed name from "sst_retention_time_sec" to "sst_retention_watermark".
}

message ValidationTask {
Expand All @@ -543,7 +543,7 @@

// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
uint64 sst_retention_watermark = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark is meta node's now - SST retention sec.
Previously it's calculated in compute node, which relies on compute node's local clock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: the reason why this change is compatible is because the previous sst_retention_time_sec must be way smaller than sst_retention_watermark. That means:

  • If meta is upgraded before compactor, compactor will be less aggressive to delete objects because now - sst_retention_watermark (using old logic to interpret the new field) is way smaller.
  • If compactor is upgraded before meta, compactor will also be less aggressive to delete objects because sst_retention_time_sec (using new logic to interpret the old field) is way smaller

optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
message LocalSstableInfo {
Copy link
Contributor Author

@zwang28 zwang28 Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SSTs to commit are no longer grouped, I make the pb type name consistent with the corresponding type in mem.

reserved 1;
reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map<uint32, hummock.TableStats> table_stats_map = 3;
uint64 created_at = 4;
}
repeated GroupedSstableInfo synced_sstables = 4;
repeated LocalSstableInfo synced_sstables = 4;

Check failure on line 42 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "synced_sstables" on message "BarrierCompleteResponse" changed type from "stream_service.BarrierCompleteResponse.GroupedSstableInfo" to "stream_service.BarrierCompleteResponse.LocalSstableInfo".
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ fn collect_resp_info(
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ impl HummockManager {
}
}

// TODO: add sanity check for serverless compaction
// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now();
let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST may have been GCed: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}

async {
if !self.env.opts.enable_committed_sst_sanity_check {
return;
Expand Down
15 changes: 14 additions & 1 deletion src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@ impl HummockManager {
}
Some(compactor) => compactor,
};
let sst_retention_watermark = self.now().saturating_sub(sst_retention_time.as_secs());
compactor
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
sst_retention_watermark,
prefix,
start_after,
limit,
Expand Down Expand Up @@ -262,6 +263,18 @@ impl HummockManager {
tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
}

pub fn now(&self) -> u64 {
// TODO: persist now to maintain non-decreasing even after a meta node reboot.
let mut guard = self.now.lock();
let new_now = chrono::Utc::now().timestamp().try_into().unwrap();
if new_now < *guard {
tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now");
return *guard;
}
*guard = new_now;
new_now
}
}

pub struct FullGcState {
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::monitor::MonitoredRwLock;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::INVALID_EPOCH;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
now: Mutex<u64>,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -287,6 +289,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
5 changes: 4 additions & 1 deletion src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,23 @@ pub struct SyncResult {
pub struct LocalSstableInfo {
pub sst_info: SstableInfo,
pub table_stats: TableStatsMap,
pub created_at: u64,
}

impl LocalSstableInfo {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap) -> Self {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
Self {
sst_info,
table_stats,
created_at,
}
}

pub fn for_test(sst_info: SstableInfo) -> Self {
Self {
sst_info,
table_stats: Default::default(),
created_at: u64::MAX,
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/vacuum_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn test_full_scan() {
let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok)));

let task = FullScanTask {
sst_retention_time_sec: 10000,
sst_retention_watermark: 0,
prefix: None,
start_after: None,
limit: None,
Expand All @@ -102,7 +102,7 @@ async fn test_full_scan() {
assert!(scan_result.is_empty());

let task = FullScanTask {
sst_retention_time_sec: 6000,
sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(),
prefix: None,
start_after: None,
limit: None,
Expand All @@ -113,7 +113,7 @@ async fn test_full_scan() {
assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]);

let task = FullScanTask {
sst_retention_time_sec: 2000,
sst_retention_watermark: u64::MAX,
prefix: None,
start_after: None,
limit: None,
Expand Down
15 changes: 14 additions & 1 deletion src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::SystemTime;

use bytes::{Bytes, BytesMut};
use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN};
Expand Down Expand Up @@ -540,8 +541,20 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
}

let writer_output = self.writer.finish(meta).await?;
// The timestamp is only used during full GC.
//
// Ideally object store object's last_modified should be used.
// However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
//
// The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
// It should help alleviate the clock drift issue.

let now = SystemTime::now()
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
Ok(SstableBuilderOutput::<W::Output> {
sst_info: LocalSstableInfo::new(sst_info, self.table_stats),
sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
writer_output,
stats: SstableBuilderOutputStats {
bloom_filter_size,
Expand Down
12 changes: 2 additions & 10 deletions src/storage/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Sub;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
Expand Down Expand Up @@ -60,12 +58,6 @@ impl Vacuum {
full_scan_task: FullScanTask,
metadata_iter: ObjectMetadataIter,
) -> HummockResult<(Vec<HummockSstableObjectId>, u64, u64, Option<String>)> {
let timestamp_watermark = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.sub(Duration::from_secs(full_scan_task.sst_retention_time_sec))
.as_secs_f64();

let mut total_object_count = 0;
let mut total_object_size = 0;
let mut next_start_after: Option<String> = None;
Expand All @@ -83,7 +75,7 @@ impl Vacuum {
next_start_after = Some(o.key.clone());
tracing::debug!(next_start_after, "set next start after");
}
if o.last_modified < timestamp_watermark {
if o.last_modified < full_scan_task.sst_retention_watermark as f64 {
Some(Ok(SstableStore::get_object_id_from_path(&o.key)))
} else {
None
Expand All @@ -109,7 +101,7 @@ impl Vacuum {
sstable_store: SstableStoreRef,
) -> HummockResult<(Vec<HummockSstableObjectId>, u64, u64, Option<String>)> {
tracing::info!(
timestamp = full_scan_task.sst_retention_time_sec,
sst_retention_watermark = full_scan_task.sst_retention_watermark,
prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")),
start_after = full_scan_task.start_after,
limit = full_scan_task.limit,
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::error::tonic::extra::Score;
use risingwave_pb::stream_service::barrier_complete_response::{
GroupedSstableInfo, PbCreateMviewProgress,
PbCreateMviewProgress, PbLocalSstableInfo,
};
use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -494,9 +494,11 @@ impl LocalBarrierWorker {
|LocalSstableInfo {
sst_info,
table_stats,
}| GroupedSstableInfo {
created_at,
}| PbLocalSstableInfo {
sst: Some(sst_info.into()),
table_stats_map: to_prost_table_stats_map(table_stats),
created_at,
},
)
.collect_vec(),
Expand Down
Loading