Skip to content

Commit

Permalink
Merge branch 'main' into yiming/deprecate-max-committed-epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 30, 2024
2 parents 9e7c6d4 + 7ff4d98 commit 73571fa
Show file tree
Hide file tree
Showing 19 changed files with 186 additions and 37 deletions.
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ message VacuumTask {

// Scan object store to get candidate orphan SSTs.
message FullScanTask {
uint64 sst_retention_time_sec = 1;
uint64 sst_retention_watermark = 1;
optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ message BarrierCompleteResponse {
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
message GroupedSstableInfo {
message LocalSstableInfo {
reserved 1;
reserved "compaction_group_id";
hummock.SstableInfo sst = 2;
map<uint32, hummock.TableStats> table_stats_map = 3;
uint64 created_at = 4;
}
repeated GroupedSstableInfo synced_sstables = 4;
repeated LocalSstableInfo synced_sstables = 4;
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/source/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ use crate::source::{SplitId, SplitMetaData};
pub struct KafkaSplit {
pub(crate) topic: String,
pub(crate) partition: i32,
/// Note: currently the start offset is **exclusive**. We need to `+1` to create the reader.
/// Possible values are:
/// - `Earliest`: `low_watermark` - 1
/// - `Latest`: `high_watermark` - 1
/// - `Timestamp`: `offset_for_timestamp` - 1
/// - `last_seen_offset`
///
/// A better approach would be to make it **inclusive**. <https://github.com/risingwavelabs/risingwave/pull/16257>
pub(crate) start_offset: Option<i64>,
pub(crate) stop_offset: Option<i64>,
#[serde(skip)]
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/hummock_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub const COMPACTION_TASK_ID: &str = "compaction_task";
pub const COMPACTION_GROUP_ID: &str = "compaction_group";
pub const SSTABLE_OBJECT_ID: &str = "sstable_object";
pub const META_BACKUP_ID: &str = "meta_backup";
/// The read & write of now is different from other sequences. It merely reuses the hummock_sequence table.
pub const HUMMOCK_NOW: &str = "now";

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)]
#[sea_orm(table_name = "hummock_sequence")]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<TriggerFullGcResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)?;
.start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)
.await?;
Ok(Response::new(TriggerFullGcResponse { status: None }))
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ fn collect_resp_info(
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
grouped.created_at,
)
});
synced_ssts.extend(ssts_iter);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ impl HummockManager {
/// or the task is not owned by `context_id` when `context_id` is not None.
pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
// TODO: add sanity check for serverless compaction
let mut guard = self.compaction.write().await;
let deterministic_mode = self.env.opts.compaction_deterministic_test;
let compaction: &mut Compaction = &mut guard;
Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ impl HummockManager {
}
}

// HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
if !sstables.is_empty() {
// sanity check to ensure SSTs to commit have not been full GCed yet.
let now = self.now().await?;
let sst_retention_watermark =
now.saturating_sub(self.env.opts.min_sst_retention_time_sec);
for sst in sstables {
if sst.created_at < sst_retention_watermark {
return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into());
}
}
}

async {
if !self.env.opts.enable_committed_sst_sanity_check {
return;
Expand Down
77 changes: 61 additions & 16 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use std::cmp;
use std::collections::HashSet;
use std::ops::Bound::{Excluded, Included};
use std::ops::DerefMut;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use anyhow::Context;
use futures::{stream, StreamExt};
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_meta_model_migration::OnConflict;
use risingwave_meta_model_v2::hummock_sequence;
use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;
use sea_orm::{ActiveValue, EntityTrait};

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::commit_multi_var;
Expand Down Expand Up @@ -177,13 +181,13 @@ impl HummockManager {
/// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`.
///
/// Returns Ok(false) if there is no worker available.
pub fn start_full_gc(
pub async fn start_full_gc(
&self,
sst_retention_time: Duration,
prefix: Option<String>,
) -> Result<bool> {
self.metrics.full_gc_trigger_count.inc();
// Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op.
// Set a minimum sst_retention_time.
let sst_retention_time = cmp::max(
sst_retention_time,
Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
Expand All @@ -205,9 +209,13 @@ impl HummockManager {
}
Some(compactor) => compactor,
};
let sst_retention_watermark = self
.now()
.await?
.saturating_sub(sst_retention_time.as_secs());
compactor
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
sst_retention_watermark,
prefix,
start_after,
limit,
Expand Down Expand Up @@ -265,6 +273,50 @@ impl HummockManager {
tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version.");
Ok(selected_object_number)
}

pub async fn now(&self) -> Result<u64> {
let mut guard = self.now.lock().await;
let new_now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
if new_now < *guard {
return Err(anyhow::anyhow!(format!(
"unexpected decreasing now, old={}, new={}",
*guard, new_now
))
.into());
}
*guard = new_now;
drop(guard);
// Persist now to maintain non-decreasing even after a meta node reboot.
if let Some(sql) = self.sql_store() {
let m = hummock_sequence::ActiveModel {
name: ActiveValue::Set(HUMMOCK_NOW.into()),
seq: ActiveValue::Set(new_now.try_into().unwrap()),
};
hummock_sequence::Entity::insert(m)
.on_conflict(
OnConflict::column(hummock_sequence::Column::Name)
.update_column(hummock_sequence::Column::Seq)
.to_owned(),
)
.exec(&sql.conn)
.await?;
}
Ok(new_now)
}

pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
let Some(sql) = self.sql_store() else {
return Ok(None);
};
let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_string())
.one(&sql.conn)
.await?
.map(|m| m.seq.try_into().unwrap());
Ok(now)
}
}

pub struct FullGcState {
Expand Down Expand Up @@ -399,6 +451,7 @@ mod tests {
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,),
None
)
.await
.unwrap());

let mut receiver = compactor_manager.add_compactor(context_id);
Expand All @@ -408,36 +461,28 @@ mod tests {
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1),
None
)
.await
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec,
full_scan_task.sst_retention_time_sec
);

assert!(hummock_manager
.start_full_gc(
Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
None
)
.await
.unwrap());
let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() {
ResponseEvent::FullScanTask(task) => task,
_ => {
panic!()
}
};
// min_sst_retention_time_sec doesn't override user provided value.
assert_eq!(
hummock_manager.env.opts.min_sst_retention_time_sec + 1,
full_scan_task.sst_retention_time_sec
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_pb::hummock::{
};
use risingwave_pb::meta::subscribe_response::Operation;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tonic::Streaming;

use crate::hummock::compaction::CompactStatus;
Expand Down Expand Up @@ -109,6 +110,7 @@ pub struct HummockManager {
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
now: Mutex<u64>,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -279,6 +281,7 @@ impl HummockManager {
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
now: Mutex::new(0),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand All @@ -296,6 +299,9 @@ impl HummockManager {

/// Load state from meta store.
async fn load_meta_store_state(&self) -> Result<()> {
let now = self.load_now().await?;
*self.now.lock().await = now.unwrap_or(0);

let mut compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let mut context_info_guard = self.context_info.write().await;
Expand Down
Loading

0 comments on commit 73571fa

Please sign in to comment.