Skip to content

Commit

Permalink
feat(debug): add support for hummock data archive (#14507)
Browse files Browse the repository at this point in the history
Co-authored-by: Wallace <[email protected]>
  • Loading branch information
zwang28 and Little-Wallace authored Jan 22, 2024
1 parent 5bb160e commit f35fb9e
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 34 deletions.
6 changes: 6 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ message HummockVersionCheckpoint {
map<uint64, StaleObjects> stale_objects = 2;
}

message HummockVersionArchive {
HummockVersion version = 1;
// some version_deltas since version
repeated HummockVersionDelta version_deltas = 2;
}

// We will have two epoch after decouple
message HummockSnapshot {
// Epoch with checkpoint, we will read durable data with it.
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ pub struct MetaConfig {
#[serde(default = "default::meta::hummock_version_checkpoint_interval_sec")]
pub hummock_version_checkpoint_interval_sec: u64,

/// If enabled, SSTable object file and version delta will be retained.
///
/// SSTable object file need to be deleted via full GC.
///
/// version delta need to be manually deleted.
#[serde(default = "default::meta::enable_hummock_data_archive")]
pub enable_hummock_data_archive: bool,

/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected.
#[serde(default = "default::meta::min_delta_log_num_for_hummock_version_checkpoint")]
Expand Down Expand Up @@ -966,6 +974,10 @@ pub mod default {
30
}

pub fn enable_hummock_data_archive() -> bool {
false
}

pub fn min_delta_log_num_for_hummock_version_checkpoint() -> u64 {
10
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
vacuum_spin_interval_ms = 10
hummock_version_checkpoint_interval_sec = 30
enable_hummock_data_archive = false
min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
hummock_version_checkpoint_interval_sec: config
.meta
.hummock_version_checkpoint_interval_sec,
enable_hummock_data_archive: config.meta.enable_hummock_data_archive,
min_delta_log_num_for_hummock_version_checkpoint: config
.meta
.min_delta_log_num_for_hummock_version_checkpoint,
Expand Down
93 changes: 64 additions & 29 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::PbHummockVersionCheckpoint;
use risingwave_pb::hummock::{PbHummockVersionArchive, PbHummockVersionCheckpoint};

use crate::hummock::error::Result;
use crate::hummock::manager::{read_lock, write_lock};
Expand Down Expand Up @@ -92,6 +92,21 @@ impl HummockManager {
Ok(())
}

pub(super) async fn write_version_archive(
&self,
archive: &PbHummockVersionArchive,
) -> Result<()> {
use prost::Message;
let buf = archive.encode_to_vec();
let archive_path = format!(
"{}/{}",
self.version_archive_dir,
archive.version.as_ref().unwrap().id
);
self.object_store.upload(&archive_path, buf.into()).await?;
Ok(())
}

/// Creates a hummock version checkpoint.
/// Returns the diff between new and old checkpoint id.
/// Note that this method must not be called concurrently, because internally it doesn't hold
Expand All @@ -109,37 +124,49 @@ impl HummockManager {
if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
return Ok(0);
}
let mut archive: Option<PbHummockVersionArchive> = None;
let mut stale_objects = old_checkpoint.stale_objects.clone();
// `object_sizes` is used to calculate size of stale objects.
let mut object_sizes = object_size_map(&old_checkpoint.version);
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for group_deltas in version_delta.group_deltas.values() {
let summary = summarize_group_deltas(group_deltas);
object_sizes.extend(
summary
.insert_table_infos
.iter()
.map(|t| (t.object_id, t.file_size)),
if !self.env.opts.enable_hummock_data_archive {
// `object_sizes` is used to calculate size of stale objects.
let mut object_sizes = object_size_map(&old_checkpoint.version);
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for group_deltas in version_delta.group_deltas.values() {
let summary = summarize_group_deltas(group_deltas);
object_sizes.extend(
summary
.insert_table_infos
.iter()
.map(|t| (t.object_id, t.file_size)),
);
}
let removed_object_ids = version_delta.gc_object_ids.clone();
if removed_object_ids.is_empty() {
continue;
}
let total_file_size = removed_object_ids
.iter()
.map(|t| object_sizes.get(t).copied().unwrap())
.sum::<u64>();
stale_objects.insert(
version_delta.id,
StaleObjects {
id: removed_object_ids,
total_file_size,
},
);
}
let removed_object_ids = version_delta.gc_object_ids.clone();
if removed_object_ids.is_empty() {
continue;
}
let total_file_size = removed_object_ids
.iter()
.map(|t| object_sizes.get(t).copied().unwrap())
.sum::<u64>();
stale_objects.insert(
version_delta.id,
StaleObjects {
id: removed_object_ids,
total_file_size,
},
);
} else {
archive = Some(PbHummockVersionArchive {
version: Some(old_checkpoint.version.to_protobuf()),
version_deltas: versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
.map(|(_, version_delta)| version_delta.to_protobuf())
.collect(),
});
}
let new_checkpoint = HummockVersionCheckpoint {
version: current_version.clone(),
Expand All @@ -148,6 +175,14 @@ impl HummockManager {
drop(versioning_guard);
// 2. persist the new checkpoint without holding lock
self.write_checkpoint(&new_checkpoint).await?;
if let Some(archive) = archive {
if let Err(e) = self.write_version_archive(&archive).await {
tracing::warn!(
"failed to write version archive {}, {e}",
archive.version.as_ref().unwrap().id
);
}
}
// 3. hold write lock and update in memory state
let mut versioning_guard = write_lock!(self, versioning).await;
let versioning = versioning_guard.deref_mut();
Expand Down
13 changes: 8 additions & 5 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
};
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::{
version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, HummockCompactionTaskId,
HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId,
SstObjectIdRange, INVALID_VERSION_ID,
version_archive_dir, version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo,
HummockCompactionTaskId, HummockContextId, HummockEpoch, HummockSstableId,
HummockSstableObjectId, HummockVersionId, SstObjectIdRange, INVALID_VERSION_ID,
};
use risingwave_meta_model_v2::{
compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
Expand Down Expand Up @@ -143,6 +143,7 @@ pub struct HummockManager {

object_store: ObjectStoreRef,
version_checkpoint_path: String,
version_archive_dir: String,
pause_version_checkpoint: AtomicBool,
history_table_throughput: parking_lot::RwLock<HashMap<u32, VecDeque<u64>>>,

Expand Down Expand Up @@ -383,7 +384,8 @@ impl HummockManager {
}
}
}
let checkpoint_path = version_checkpoint_path(state_store_dir);
let version_checkpoint_path = version_checkpoint_path(state_store_dir);
let version_archive_dir = version_archive_dir(state_store_dir);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let instance = HummockManager {
Expand All @@ -407,7 +409,8 @@ impl HummockManager {
}),
event_sender: tx,
object_store,
version_checkpoint_path: checkpoint_path,
version_checkpoint_path,
version_archive_dir,
pause_version_checkpoint: AtomicBool::new(false),
history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
compactor_streams_change_tx,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct MetaOpts {
pub vacuum_spin_interval_ms: u64,
/// Interval of hummock version checkpoint.
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
/// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
Expand Down Expand Up @@ -228,6 +229,7 @@ impl MetaOpts {
vacuum_interval_sec: 30,
vacuum_spin_interval_ms: 0,
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
Expand Down
5 changes: 5 additions & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,16 @@ pub fn can_concat(ssts: &[SstableInfo]) -> bool {

const CHECKPOINT_DIR: &str = "checkpoint";
const CHECKPOINT_NAME: &str = "0";
const ARCHIVE_DIR: &str = "archive";

pub fn version_checkpoint_path(root_dir: &str) -> String {
format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
}

pub fn version_archive_dir(root_dir: &str) -> String {
format!("{}/{}", root_dir, ARCHIVE_DIR)
}

pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
checkpoint_path.trim_end_matches(|c| c != '/').to_string()
}
Expand Down

0 comments on commit f35fb9e

Please sign in to comment.