Skip to content

Commit

Permalink
refactor(storage): add SST sanity check during commit epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 29, 2024
1 parent 8f0d5de commit d338210
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 21 deletions.
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ message VacuumTask {

// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
uint64 sst_retention_watermark = 1;
optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ message BarrierCompleteResponse {
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
message LocalSstableInfo {
reserved 1;
reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map<uint32, hummock.TableStats> table_stats_map = 3;
uint64 created_at = 4;
}
repeated GroupedSstableInfo synced_sstables = 4;
repeated LocalSstableInfo synced_sstables = 4;

Check failure on line 42 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "synced_sstables" on message "BarrierCompleteResponse" changed type from "stream_service.BarrierCompleteResponse.GroupedSstableInfo" to "stream_service.BarrierCompleteResponse.LocalSstableInfo".
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ fn collect_resp_info(
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ impl HummockManager {
}
}

// TODO: add sanity check for serverless compaction
// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now();
let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST may have been GCed: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}

async {
if !self.env.opts.enable_committed_sst_sanity_check {
return;
Expand Down
15 changes: 14 additions & 1 deletion src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@ impl HummockManager {
}
Some(compactor) => compactor,
};
let sst_retention_watermark = self.now().saturating_sub(sst_retention_time.as_secs());
compactor
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
sst_retention_watermark,
prefix,
start_after,
limit,
Expand Down Expand Up @@ -262,6 +263,18 @@ impl HummockManager {
tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
}

pub fn now(&self) -> u64 {
// TODO: persist now to maintain non-decreasing even after a meta node reboot.
let mut guard = self.now.lock();
let new_now = chrono::Utc::now().timestamp().try_into().unwrap();
if new_now < *guard {
tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now");
return *guard;
}
*guard = new_now;
new_now
}
}

pub struct FullGcState {
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::monitor::MonitoredRwLock;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::INVALID_EPOCH;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
now: Mutex<u64>,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -287,6 +289,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
5 changes: 4 additions & 1 deletion src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,23 @@ pub struct SyncResult {
pub struct LocalSstableInfo {
pub sst_info: SstableInfo,
pub table_stats: TableStatsMap,
pub created_at: u64,
}

impl LocalSstableInfo {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap) -> Self {
pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
Self {
sst_info,
table_stats,
created_at,
}
}

pub fn for_test(sst_info: SstableInfo) -> Self {
Self {
sst_info,
table_stats: Default::default(),
created_at: u64::MAX,
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/vacuum_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn test_full_scan() {
let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok)));

let task = FullScanTask {
sst_retention_time_sec: 10000,
sst_retention_watermark: 0,
prefix: None,
start_after: None,
limit: None,
Expand All @@ -102,7 +102,7 @@ async fn test_full_scan() {
assert!(scan_result.is_empty());

let task = FullScanTask {
sst_retention_time_sec: 6000,
sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(),
prefix: None,
start_after: None,
limit: None,
Expand All @@ -113,7 +113,7 @@ async fn test_full_scan() {
assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]);

let task = FullScanTask {
sst_retention_time_sec: 2000,
sst_retention_watermark: u64::MAX,
prefix: None,
start_after: None,
limit: None,
Expand Down
15 changes: 14 additions & 1 deletion src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::SystemTime;

use bytes::{Bytes, BytesMut};
use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN};
Expand Down Expand Up @@ -540,8 +541,20 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
}

let writer_output = self.writer.finish(meta).await?;
// The timestamp is only used during full GC.
//
// Ideally object store object's last_modified should be used.
// However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
//
// The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
// It should help alleviate the clock drift issue.

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
Ok(SstableBuilderOutput::<W::Output> {
sst_info: LocalSstableInfo::new(sst_info, self.table_stats),
sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
writer_output,
stats: SstableBuilderOutputStats {
bloom_filter_size,
Expand Down
12 changes: 2 additions & 10 deletions src/storage/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Sub;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
Expand Down Expand Up @@ -60,12 +58,6 @@ impl Vacuum {
full_scan_task: FullScanTask,
metadata_iter: ObjectMetadataIter,
) -> HummockResult<(Vec<HummockSstableObjectId>, u64, u64, Option<String>)> {
let timestamp_watermark = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.sub(Duration::from_secs(full_scan_task.sst_retention_time_sec))
.as_secs_f64();

let mut total_object_count = 0;
let mut total_object_size = 0;
let mut next_start_after: Option<String> = None;
Expand All @@ -83,7 +75,7 @@ impl Vacuum {
next_start_after = Some(o.key.clone());
tracing::debug!(next_start_after, "set next start after");
}
if o.last_modified < timestamp_watermark {
if o.last_modified < full_scan_task.sst_retention_watermark as f64 {
Some(Ok(SstableStore::get_object_id_from_path(&o.key)))
} else {
None
Expand All @@ -109,7 +101,7 @@ impl Vacuum {
sstable_store: SstableStoreRef,
) -> HummockResult<(Vec<HummockSstableObjectId>, u64, u64, Option<String>)> {
tracing::info!(
timestamp = full_scan_task.sst_retention_time_sec,
sst_retention_watermark = full_scan_task.sst_retention_watermark,
prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")),
start_after = full_scan_task.start_after,
limit = full_scan_task.limit,
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::error::tonic::extra::Score;
use risingwave_pb::stream_service::barrier_complete_response::{
GroupedSstableInfo, PbCreateMviewProgress,
PbCreateMviewProgress, PbLocalSstableInfo,
};
use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -494,9 +494,11 @@ impl LocalBarrierWorker {
|LocalSstableInfo {
sst_info,
table_stats,
}| GroupedSstableInfo {
created_at,
}| PbLocalSstableInfo {
sst: Some(sst_info.into()),
table_stats_map: to_prost_table_stats_map(table_stats),
created_at,
},
)
.collect_vec(),
Expand Down

0 comments on commit d338210

Please sign in to comment.