From ff00ba033d06d70410453357aca7dc85b6cece59 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Sun, 20 Oct 2024 15:33:20 -0400 Subject: [PATCH] feat(pageserver): support partial gc-compaction for delta layers (#9611) Signed-off-by: Alex Chi Z partial address comments Signed-off-by: Alex Chi Z --- pageserver/compaction/src/helpers.rs | 9 + pageserver/src/tenant.rs | 219 +++++++++---- .../src/tenant/storage_layer/delta_layer.rs | 4 + pageserver/src/tenant/timeline/compaction.rs | 295 ++++++++++++------ 4 files changed, 368 insertions(+), 159 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 9dbb6ecedf239..6b739d85a74b7 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -35,6 +35,15 @@ pub fn overlaps_with(a: &Range, b: &Range) -> bool { !(a.end <= b.start || b.end <= a.start) } +/// Whether a fully contains b, example as below +/// ```plain +/// | a | +/// | b | +/// ``` +pub fn fully_contains(a: &Range, b: &Range) -> bool { + a.start <= b.start && a.end >= b.end +} + pub fn union_to_keyspace(a: &mut CompactionKeySpace, b: CompactionKeySpace) { let x = std::mem::take(a); let mut all_ranges_iter = [x.into_iter(), b.into_iter()] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 903174680e00c..662ee32a0be4c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -9229,6 +9229,23 @@ mod tests { Ok(()) } + fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering { + ( + k1.is_delta, + k1.key_range.start, + k1.key_range.end, + k1.lsn_range.start, + k1.lsn_range.end, + ) + .cmp(&( + k2.is_delta, + k2.key_range.start, + k2.key_range.end, + k2.lsn_range.start, + k2.lsn_range.end, + )) + } + async fn inspect_and_sort( tline: &Arc, filter: Option>, @@ -9237,25 +9254,30 @@ mod tests { if let Some(filter) = filter { all_layers.retain(|layer| overlaps_with(&layer.key_range, &filter)); } - all_layers.sort_by(|k1, k2| { - ( - k1.is_delta, - k1.key_range.start, - k1.key_range.end, - k1.lsn_range.start, - k1.lsn_range.end, - ) - .cmp(&( - k2.is_delta, - k2.key_range.start, - k2.key_range.end, - k2.lsn_range.start, - k2.lsn_range.end, - )) - }); + all_layers.sort_by(sort_layer_key); all_layers } + #[cfg(feature = "testing")] + fn check_layer_map_key_eq( + mut left: Vec, + mut right: Vec, + ) { + left.sort_by(sort_layer_key); + right.sort_by(sort_layer_key); + if left != right { + eprintln!("---LEFT---"); + for left in left.iter() { + eprintln!("{}", left); + } + eprintln!("---RIGHT---"); + for right in right.iter() { + eprintln!("{}", right); + } + assert_eq!(left, right); + } + } + #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_partial_bottom_most_compaction() -> anyhow::Result<()> { @@ -9348,127 +9370,206 @@ mod tests { let cancel = CancellationToken::new(); - // Do a partial compaction on key range 0..4, we should generate a image layer; no other layers - // can be removed because they might be used for other key ranges. + // Do a partial compaction on key range 0..2 tline - .partial_compact_with_gc(Some(get_key(0)..get_key(4)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(Some(get_key(0)..get_key(2)), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ + // newly-generated image layer for the partial compaction range 0-2 PersistentLayerKey { - key_range: get_key(0)..get_key(4), + key_range: get_key(0)..get_key(2), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { key_range: get_key(0)..get_key(10), lsn_range: Lsn(0x10)..Lsn(0x11), - is_delta: false + is_delta: false, }, + // delta1 is split and the second part is rewritten PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(5)..get_key(7), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], ); - // Do a partial compaction on key range 4..10 + // Do a partial compaction on key range 2..4 tline - .partial_compact_with_gc(Some(get_key(4)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(Some(get_key(2)..get_key(4)), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ PersistentLayerKey { - key_range: get_key(0)..get_key(4), + key_range: get_key(0)..get_key(2), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { - // if (in the future) GC kicks in, this layer will be removed key_range: get_key(0)..get_key(10), lsn_range: Lsn(0x10)..Lsn(0x11), - is_delta: false + is_delta: false, }, + // image layer generated for the compaction range 2-4 PersistentLayerKey { - key_range: get_key(4)..get_key(10), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, + // we have key2/key3 above the retain_lsn, so we still need this delta layer PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(5)..get_key(7), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], ); - // Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones. + // Do a partial compaction on key range 4..9 tline - .partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .partial_compact_with_gc(Some(get_key(4)..get_key(9)), &cancel, EnumSet::new(), &ctx) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; - assert_eq!( + check_layer_map_key_eq( all_layers, vec![ PersistentLayerKey { - key_range: get_key(0)..get_key(4), + key_range: get_key(0)..get_key(2), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, }, PersistentLayerKey { - key_range: get_key(4)..get_key(10), + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true, + }, + // image layer generated for this compaction range + PersistentLayerKey { + key_range: get_key(4)..get_key(9), lsn_range: Lsn(0x20)..Lsn(0x21), - is_delta: false + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // Do a partial compaction on key range 9..10 + tline + .partial_compact_with_gc(Some(get_key(9)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, }, PersistentLayerKey { - key_range: get_key(1)..get_key(4), + key_range: get_key(2)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { - key_range: get_key(5)..get_key(7), + key_range: get_key(4)..get_key(9), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + // image layer generated for the compaction range + PersistentLayerKey { + key_range: get_key(9)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones. + tline + .partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // aha, we removed all unnecessary image/delta layers and got a very clean layer map! + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false, + }, + PersistentLayerKey { + key_range: get_key(2)..get_key(4), lsn_range: Lsn(0x20)..Lsn(0x48), - is_delta: true + is_delta: true, }, PersistentLayerKey { key_range: get_key(8)..get_key(10), lsn_range: Lsn(0x48)..Lsn(0x50), - is_delta: true - } - ] + is_delta: true, + }, + ], ); Ok(()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 664c00a6b1c3b..fec8a0a16c500 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -653,6 +653,10 @@ impl DeltaLayerWriter { }) } + pub fn is_empty(&self) -> bool { + self.inner.as_ref().unwrap().num_keys == 0 + } + /// /// Append a key-value pair to the file. /// diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 01c2803881771..fa7df0290d910 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -56,7 +56,7 @@ use pageserver_api::value::Value; use utils::lsn::Lsn; -use pageserver_compaction::helpers::overlaps_with; +use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; use super::CompactionError; @@ -64,6 +64,21 @@ use super::CompactionError; /// Maximum number of deltas before generating an image layer in bottom-most compaction. const COMPACTION_DELTA_THRESHOLD: usize = 5; +pub struct GcCompactionJobDescription { + /// All layers to read in the compaction job + selected_layers: Vec, + /// GC cutoff of the job + gc_cutoff: Lsn, + /// LSNs to retain for the job + retain_lsns_below_horizon: Vec, + /// Maximum layer LSN processed in this compaction + max_layer_lsn: Lsn, + /// Only compact layers overlapping with this range + partial_key_range: Option>, + /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap + rewrite_layers: Vec, +} + /// The result of bottom-most compaction for a single key at each LSN. #[derive(Debug)] #[cfg_attr(test, derive(PartialEq))] @@ -1732,9 +1747,10 @@ impl Timeline { /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon, /// and create delta layers with all deltas >= gc horizon. /// - /// If `key_range`, it will only compact the keys within the range, aka partial compaction. This functionality - /// is not complete yet, and if it is set, only image layers will be generated. - /// + /// If `key_range`, it will only compact the keys within the range, aka partial compaction. Partial compaction + /// will read and process all layers overlapping with the key range, even if it might contain extra keys. + /// After the gc-compaction phase completes, delta layers that are not fully contained within the key range + /// will be rewritten to ensure they do not overlap with the delta layers. pub(crate) async fn partial_compact_with_gc( self: &Arc, compaction_key_range: Option>, @@ -1762,7 +1778,6 @@ impl Timeline { .await?; let dry_run = flags.contains(CompactFlags::DryRun); - let partial_compaction = compaction_key_range.is_some(); if let Some(ref compaction_key_range) = compaction_key_range { info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end); @@ -1780,7 +1795,7 @@ impl Timeline { // The layer selection has the following properties: // 1. If a layer is in the selection, all layers below it are in the selection. // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection. - let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = if !partial_compaction { + let job_desc = { let guard = self.layers.read().await; let layers = guard.layer_map()?; let gc_info = self.gc_info.read().unwrap(); @@ -1810,9 +1825,25 @@ impl Timeline { }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key // layers to compact. + let mut rewrite_layers = Vec::new(); for desc in layers.iter_historic_layers() { if desc.get_lsn_range().end <= max_layer_lsn { - selected_layers.push(guard.get_from_desc(&desc)); + if let Some(compaction_key_range) = compaction_key_range.as_ref() { + if overlaps_with(&desc.get_key_range(), compaction_key_range) { + // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range, + // even if it might contain extra keys + selected_layers.push(guard.get_from_desc(&desc)); + // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine + // to overlap image layers) + if desc.is_delta() + && !fully_contains(compaction_key_range, &desc.get_key_range()) + { + rewrite_layers.push(guard.get_from_desc(&desc)); + } + } + } else { + selected_layers.push(guard.get_from_desc(&desc)); + } } } if selected_layers.is_empty() { @@ -1820,82 +1851,62 @@ impl Timeline { return Ok(()); } retain_lsns_below_horizon.sort(); - (selected_layers, gc_cutoff, retain_lsns_below_horizon) - } else { - // In case of partial compaction, we currently only support generating image layers, and therefore, - // we pick all layers that are below the lowest retain_lsn and does not intersect with any of the layers. - let guard = self.layers.read().await; - let layers = guard.layer_map()?; - let gc_info = self.gc_info.read().unwrap(); - let mut min_lsn = gc_info.cutoffs.select_min(); - for (lsn, _, _) in &gc_info.retain_lsns { - if lsn < &min_lsn { - min_lsn = *lsn; - } - } - for lsn in gc_info.leases.keys() { - if lsn < &min_lsn { - min_lsn = *lsn; - } - } - let mut selected_layers = Vec::new(); - drop(gc_info); - // |-------| |-------| |-------| - // | Delta | | Delta | | Delta | -- min_lsn could be intersecting with the layers - // |-------| |-------| |-------| <- we want to pick all the layers below min_lsn, so that - // | Delta | | Delta | | Delta | ...we can remove them after compaction - // |-------| |-------| |-------| - // Pick all the layers intersect or below the min_lsn, get the largest LSN in the selected layers. - let Some(compaction_key_range) = compaction_key_range.as_ref() else { - unreachable!() - }; - for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().end <= min_lsn - && overlaps_with(&desc.key_range, compaction_key_range) - { - selected_layers.push(guard.get_from_desc(&desc)); - } + GcCompactionJobDescription { + selected_layers, + gc_cutoff, + retain_lsns_below_horizon, + max_layer_lsn, + partial_key_range: compaction_key_range, + rewrite_layers, } - if selected_layers.is_empty() { - info!("no layers to compact with gc"); - return Ok(()); - } - (selected_layers, min_lsn, Vec::new()) }; let lowest_retain_lsn = if self.ancestor_timeline.is_some() { - if partial_compaction { + if job_desc.partial_key_range.is_some() { warn!("partial compaction cannot run on child branches (for now)"); return Ok(()); } Lsn(self.ancestor_lsn.0 + 1) } else { - let res = retain_lsns_below_horizon + let res = job_desc + .retain_lsns_below_horizon .first() .copied() - .unwrap_or(gc_cutoff); + .unwrap_or(job_desc.gc_cutoff); if cfg!(debug_assertions) { assert_eq!( res, - retain_lsns_below_horizon + job_desc + .retain_lsns_below_horizon .iter() .min() .copied() - .unwrap_or(gc_cutoff) + .unwrap_or(job_desc.gc_cutoff) ); } res }; info!( - "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}", - layer_selection.len(), - gc_cutoff, - lowest_retain_lsn + "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, partial_compaction={}", + job_desc.selected_layers.len(), + job_desc.rewrite_layers.len(), + job_desc.max_layer_lsn, + job_desc.gc_cutoff, + lowest_retain_lsn, + job_desc.partial_key_range.as_ref().map(|x| format!("{}..{}", x.start, x.end)).unwrap_or_default() ); - self.check_compaction_space(&layer_selection).await?; + for layer in &job_desc.selected_layers { + debug!("read layer: {}", layer.layer_desc().key()); + } + for layer in &job_desc.rewrite_layers { + debug!("rewrite layer: {}", layer.layer_desc().key()); + } + + self.check_compaction_space(&job_desc.selected_layers) + .await?; // Generate statistics for the compaction - for layer in &layer_selection { + for layer in &job_desc.selected_layers { let desc = layer.layer_desc(); if desc.is_delta() { stat.visit_delta_layer(desc.file_size()); @@ -1906,25 +1917,25 @@ impl Timeline { // Step 1: construct a k-merge iterator over all layers. // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. - let layer_names: Vec = layer_selection + let layer_names = job_desc + .selected_layers .iter() .map(|layer| layer.layer_desc().layer_name()) .collect_vec(); if let Some(err) = check_valid_layermap(&layer_names) { - bail!("cannot run gc-compaction because {}", err); + warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err); } // The maximum LSN we are processing in this compaction loop - let end_lsn = layer_selection + let end_lsn = job_desc + .selected_layers .iter() .map(|l| l.layer_desc().lsn_range.end) .max() .unwrap(); - // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized - // as an L0 layer. let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); let mut downloaded_layers = Vec::new(); - for layer in &layer_selection { + for layer in &job_desc.selected_layers { let resident_layer = layer.download_and_keep_resident().await?; downloaded_layers.push(resident_layer); } @@ -1943,8 +1954,8 @@ impl Timeline { dense_ks, sparse_ks, )?; - // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas. - // Data of the same key. + + // Step 2: Produce images+deltas. let mut accumulated_values = Vec::new(); let mut last_key: Option = None; @@ -1956,7 +1967,8 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - compaction_key_range + job_desc + .partial_key_range .as_ref() .map(|x| x.start) .unwrap_or(Key::MIN), @@ -2018,25 +2030,25 @@ impl Timeline { } accumulated_values.push((key, lsn, val)); } else { - let last_key = last_key.as_mut().unwrap(); - stat.on_unique_key_visited(); - let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { - !compaction_key_range.contains(last_key) - } else { - false - }; + let last_key: &mut Key = last_key.as_mut().unwrap(); + stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction + let skip_adding_key = + if let Some(compaction_key_range) = &job_desc.partial_key_range { + !compaction_key_range.contains(last_key) + } else { + false + }; if !skip_adding_key { let retention = self .generate_key_retention( *last_key, &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, + job_desc.gc_cutoff, + &job_desc.retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, get_ancestor_image(self, *last_key, ctx).await?, ) .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. retention .pipe_to( *last_key, @@ -2057,23 +2069,23 @@ impl Timeline { let last_key = last_key.expect("no keys produced during compaction"); stat.on_unique_key_visited(); - let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { + let skip_adding_key = if let Some(ref compaction_key_range) = job_desc.partial_key_range { !compaction_key_range.contains(&last_key) } else { false }; + if !skip_adding_key { let retention = self .generate_key_retention( last_key, &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, + job_desc.gc_cutoff, + &job_desc.retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, get_ancestor_image(self, last_key, ctx).await?, ) .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. retention .pipe_to( last_key, @@ -2086,6 +2098,55 @@ impl Timeline { } // end: move the above part to the loop body + let mut rewrote_delta_layers = Vec::new(); + if let Some(compaction_key_range) = &job_desc.partial_key_range { + for layer in job_desc.rewrite_layers { + // For each of the layer to rewrite, generate a layer with the same LSN but less key content + // This download should be no-op b/c it's included in selected_layer and has been downloaded before + let resident_layer = layer.download_and_keep_resident().await?; + let desc = resident_layer.layer_desc(); + let layer = resident_layer.get_as_delta(ctx).await?; + let mut iter = layer.iter(ctx); + let mut delta_writer_before = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + desc.key_range.start, + desc.lsn_range.clone(), + ctx, + ) + .await?; + let mut delta_writer_after = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + compaction_key_range.end, + desc.lsn_range.clone(), + ctx, + ) + .await?; + while let Some((key, lsn, val)) = iter.next().await? { + if key < compaction_key_range.start { + delta_writer_before.put_value(key, lsn, val, ctx).await?; + } else if key >= compaction_key_range.end { + delta_writer_after.put_value(key, lsn, val, ctx).await?; + } + } + if !delta_writer_before.is_empty() && !dry_run { + let (desc, path) = delta_writer_before + .finish(compaction_key_range.start, ctx) + .await?; + let layer = Layer::finish_creating(self.conf, self, desc, &path)?; + rewrote_delta_layers.push(layer); + } + if !delta_writer_after.is_empty() && !dry_run { + let (desc, path) = delta_writer_after.finish(desc.key_range.end, ctx).await?; + let layer = Layer::finish_creating(self.conf, self, desc, &path)?; + rewrote_delta_layers.push(layer); + } + } + } + let discard = |key: &PersistentLayerKey| { let key = key.clone(); async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await } @@ -2093,7 +2154,8 @@ impl Timeline { let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { - let end_key = compaction_key_range + let end_key = job_desc + .partial_key_range .as_ref() .map(|x| x.end) .unwrap_or(Key::MAX); @@ -2117,10 +2179,8 @@ impl Timeline { Vec::new() }; - if partial_compaction && !produced_delta_layers.is_empty() { - bail!("implementation error: partial compaction should not be producing delta layers (for now)"); - } - + // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if + // compaction is cancelled at this point, we might have some layers that are not cleaned up. let mut compact_to = Vec::new(); let mut keep_layers = HashSet::new(); let produced_delta_layers_len = produced_delta_layers.len(); @@ -2128,52 +2188,86 @@ impl Timeline { for action in produced_delta_layers { match action { BatchWriterResult::Produced(layer) => { + if cfg!(debug_assertions) { + info!("produced delta layer: {}", layer.layer_desc().key()); + } stat.produce_delta_layer(layer.layer_desc().file_size()); compact_to.push(layer); } BatchWriterResult::Discarded(l) => { + if cfg!(debug_assertions) { + info!("discarded delta layer: {}", l); + } keep_layers.insert(l); stat.discard_delta_layer(); } } } + if cfg!(debug_assertions) { + for layer in &rewrote_delta_layers { + info!( + "produced rewritten delta layer: {}", + layer.layer_desc().key() + ); + } + } + compact_to.extend(rewrote_delta_layers); for action in produced_image_layers { match action { BatchWriterResult::Produced(layer) => { + if cfg!(debug_assertions) { + info!("produced image layer: {}", layer.layer_desc().key()); + } stat.produce_image_layer(layer.layer_desc().file_size()); compact_to.push(layer); } BatchWriterResult::Discarded(l) => { + if cfg!(debug_assertions) { + info!("discarded image layer: {}", l); + } keep_layers.insert(l); stat.discard_image_layer(); } } } - let mut layer_selection = layer_selection; - layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); - if let Some(ref compaction_key_range) = compaction_key_range { + + let mut layer_selection = job_desc.selected_layers; + + if let Some(ref compaction_key_range) = job_desc.partial_key_range { // Partial compaction might select more data than it processes, e.g., if // the compaction_key_range only partially overlaps: // // [---compaction_key_range---] // [---A----][----B----][----C----][----D----] // - // A,B,C,D are all in the `layer_selection`. The created image layers contain - // whatever is needed from B, C, and from `----]` of A, and from `[--` of D. + // For delta layers, we will rewrite the layers so that it is cut exactly at + // the compaction key range, so we can always discard them. However, for image + // layers, as we do not rewrite them for now, we need to handle them differently. + // Assume image layers A, B, C, D are all in the `layer_selection`. + // + // The created image layers contain whatever is needed from B, C, and from + // `----]` of A, and from `[---` of D. // - // In contrast, `[--A-` and `--D----]` have not been processed, so, we must + // In contrast, `[---A` and `D----]` have not been processed, so, we must // keep that data. // - // The solution for now is to keep A and D completely. - // (layer_selection is what we'll remove from the layer map, so, - // retain what is _not_ fully covered by compaction_key_range). - layer_selection.retain(|x| { - let key_range = &x.layer_desc().key_range; - key_range.start >= compaction_key_range.start - && key_range.end <= compaction_key_range.end - }); + // The solution for now is to keep A and D completely if they are image layers. + // (layer_selection is what we'll remove from the layer map, so, retain what + // is _not_ fully covered by compaction_key_range). + for layer in &layer_selection { + if !layer.layer_desc().is_delta() { + if !overlaps_with(&layer.layer_desc().key_range, compaction_key_range) { + bail!("violated constraint: image layer outside of compaction key range"); + } + if !fully_contains(compaction_key_range, &layer.layer_desc().key_range) { + keep_layers.insert(layer.layer_desc().key()); + } + } + } } + layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); + info!( "gc-compaction statistics: {}", serde_json::to_string(&stat)? @@ -2192,6 +2286,7 @@ impl Timeline { // Step 3: Place back to the layer map. { + // TODO: sanity check if the layer map is valid (i.e., should not have overlaps) let mut guard = self.layers.write().await; guard .open_mut()?