Skip to content

Commit

Permalink
refactor(pageserver): consolidate partial and full gc-compaction code…
Browse files Browse the repository at this point in the history
… path (#9611)

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Nov 11, 2024
1 parent 49d6aca commit eaa253c
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 134 deletions.
10 changes: 5 additions & 5 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9372,7 +9372,7 @@ mod tests {

// Do a partial compaction on key range 0..2
tline
.partial_compact_with_gc(Some(get_key(0)..get_key(2)), &cancel, EnumSet::new(), &ctx)
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9411,7 +9411,7 @@ mod tests {

// Do a partial compaction on key range 2..4
tline
.partial_compact_with_gc(Some(get_key(2)..get_key(4)), &cancel, EnumSet::new(), &ctx)
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9455,7 +9455,7 @@ mod tests {

// Do a partial compaction on key range 4..9
tline
.partial_compact_with_gc(Some(get_key(4)..get_key(9)), &cancel, EnumSet::new(), &ctx)
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9498,7 +9498,7 @@ mod tests {

// Do a partial compaction on key range 9..10
tline
.partial_compact_with_gc(Some(get_key(9)..get_key(10)), &cancel, EnumSet::new(), &ctx)
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down Expand Up @@ -9546,7 +9546,7 @@ mod tests {

// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
tline
.partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx)
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
Expand Down
246 changes: 117 additions & 129 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ pub struct GcCompactionJobDescription {
/// Maximum layer LSN processed in this compaction
max_layer_lsn: Lsn,
/// Only compact layers overlapping with this range
partial_key_range: Option<Range<Key>>,
/// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap
rewrite_layers: Vec<Layer>,
compaction_key_range: Range<Key>,
/// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
/// This field is here solely for debugging. The field will not be read once the compaction
/// description is generated.
rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
}

/// The result of bottom-most compaction for a single key at each LSN.
Expand Down Expand Up @@ -1737,7 +1739,8 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.partial_compact_with_gc(None, cancel, flags, ctx).await
self.partial_compact_with_gc(Key::MIN..Key::MAX, cancel, flags, ctx)
.await
}

/// An experimental compaction building block that combines compaction with garbage collection.
Expand All @@ -1747,13 +1750,15 @@ impl Timeline {
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
/// and create delta layers with all deltas >= gc horizon.
///
/// If `key_range`, it will only compact the keys within the range, aka partial compaction. Partial compaction
/// will read and process all layers overlapping with the key range, even if it might contain extra keys.
/// After the gc-compaction phase completes, delta layers that are not fully contained within the key range
/// will be rewritten to ensure they do not overlap with the delta layers.
/// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction.
/// Partial compaction will read and process all layers overlapping with the key range, even if it might
/// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
/// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
/// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
/// part of the range.
pub(crate) async fn partial_compact_with_gc(
self: &Arc<Self>,
compaction_key_range: Option<Range<Key>>,
compaction_key_range: Range<Key>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
Expand All @@ -1779,7 +1784,7 @@ impl Timeline {

let dry_run = flags.contains(CompactFlags::DryRun);

if let Some(ref compaction_key_range) = compaction_key_range {
if compaction_key_range == (Key::MIN..Key::MAX) {
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end);
} else {
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
Expand Down Expand Up @@ -1827,22 +1832,18 @@ impl Timeline {
// layers to compact.
let mut rewrite_layers = Vec::new();
for desc in layers.iter_historic_layers() {
if desc.get_lsn_range().end <= max_layer_lsn {
if let Some(compaction_key_range) = compaction_key_range.as_ref() {
if overlaps_with(&desc.get_key_range(), compaction_key_range) {
// If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
// even if it might contain extra keys
selected_layers.push(guard.get_from_desc(&desc));
// If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
// to overlap image layers)
if desc.is_delta()
&& !fully_contains(compaction_key_range, &desc.get_key_range())
{
rewrite_layers.push(guard.get_from_desc(&desc));
}
}
} else {
selected_layers.push(guard.get_from_desc(&desc));
if desc.get_lsn_range().end <= max_layer_lsn
&& overlaps_with(&desc.get_key_range(), &compaction_key_range)
{
// If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
// even if it might contain extra keys
selected_layers.push(guard.get_from_desc(&desc));
// If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
// to overlap image layers)
if desc.is_delta()
&& !fully_contains(&compaction_key_range, &desc.get_key_range())
{
rewrite_layers.push(desc);
}
}
}
Expand All @@ -1856,15 +1857,11 @@ impl Timeline {
gc_cutoff,
retain_lsns_below_horizon,
max_layer_lsn,
partial_key_range: compaction_key_range,
compaction_key_range,
rewrite_layers,
}
};
let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
if job_desc.partial_key_range.is_some() {
warn!("partial compaction cannot run on child branches (for now)");
return Ok(());
}
Lsn(self.ancestor_lsn.0 + 1)
} else {
let res = job_desc
Expand All @@ -1886,20 +1883,21 @@ impl Timeline {
res
};
info!(
"picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, partial_compaction={}",
"picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}",
job_desc.selected_layers.len(),
job_desc.rewrite_layers.len(),
job_desc.max_layer_lsn,
job_desc.gc_cutoff,
lowest_retain_lsn,
job_desc.partial_key_range.as_ref().map(|x| format!("{}..{}", x.start, x.end)).unwrap_or_default()
job_desc.compaction_key_range.start,
job_desc.compaction_key_range.end
);

for layer in &job_desc.selected_layers {
debug!("read layer: {}", layer.layer_desc().key());
}
for layer in &job_desc.rewrite_layers {
debug!("rewrite layer: {}", layer.layer_desc().key());
debug!("rewrite layer: {}", layer.key());
}

self.check_compaction_space(&job_desc.selected_layers)
Expand Down Expand Up @@ -1967,11 +1965,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc
.partial_key_range
.as_ref()
.map(|x| x.start)
.unwrap_or(Key::MIN),
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
ctx,
Expand Down Expand Up @@ -2027,50 +2021,46 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
}
if let Some(compaction_key_range) = &job_desc.partial_key_range {
if !compaction_key_range.contains(&key) {
if !desc.is_delta {
continue;
}
let rewriter = delta_layer_rewriters
.entry(desc.clone())
.or_insert_with_default();
let rewriter = if key < compaction_key_range.start {
if rewriter.before.is_none() {
rewriter.before = Some(
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
ctx,
)
.await?,
);
}
rewriter.before.as_mut().unwrap()
} else if key >= compaction_key_range.end {
if rewriter.after.is_none() {
rewriter.after = Some(
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
compaction_key_range.end,
desc.lsn_range.clone(),
ctx,
)
.await?,
);
}
rewriter.after.as_mut().unwrap()
} else {
unreachable!()
};
rewriter.put_value(key, lsn, val, ctx).await?;
if !job_desc.compaction_key_range.contains(&key) {
if !desc.is_delta {
continue;
}
let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
let rewriter = if key < job_desc.compaction_key_range.start {
if rewriter.before.is_none() {
rewriter.before = Some(
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
ctx,
)
.await?,
);
}
rewriter.before.as_mut().unwrap()
} else if key >= job_desc.compaction_key_range.end {
if rewriter.after.is_none() {
rewriter.after = Some(
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
ctx,
)
.await?,
);
}
rewriter.after.as_mut().unwrap()
} else {
unreachable!()
};
rewriter.put_value(key, lsn, val, ctx).await?;
continue;
}
match val {
Value::Image(_) => stat.visit_image_key(&val),
Expand Down Expand Up @@ -2135,20 +2125,18 @@ impl Timeline {
// end: move the above part to the loop body

let mut rewrote_delta_layers = Vec::new();
if let Some(compaction_key_range) = &job_desc.partial_key_range {
for (key, writers) in delta_layer_rewriters {
if let Some(delta_writer_before) = writers.before {
let (desc, path) = delta_writer_before
.finish(compaction_key_range.start, ctx)
.await?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
rewrote_delta_layers.push(layer);
}
if let Some(delta_writer_after) = writers.after {
let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
rewrote_delta_layers.push(layer);
}
for (key, writers) in delta_layer_rewriters {
if let Some(delta_writer_before) = writers.before {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
rewrote_delta_layers.push(layer);
}
if let Some(delta_writer_after) = writers.after {
let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
rewrote_delta_layers.push(layer);
}
}

Expand All @@ -2159,11 +2147,7 @@ impl Timeline {

let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
let end_key = job_desc
.partial_key_range
.as_ref()
.map(|x| x.end)
.unwrap_or(Key::MAX);
let end_key = job_desc.compaction_key_range.end;
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await?
Expand Down Expand Up @@ -2232,35 +2216,39 @@ impl Timeline {

let mut layer_selection = job_desc.selected_layers;

if let Some(ref compaction_key_range) = job_desc.partial_key_range {
// Partial compaction might select more data than it processes, e.g., if
// the compaction_key_range only partially overlaps:
//
// [---compaction_key_range---]
// [---A----][----B----][----C----][----D----]
//
// For delta layers, we will rewrite the layers so that it is cut exactly at
// the compaction key range, so we can always discard them. However, for image
// layers, as we do not rewrite them for now, we need to handle them differently.
// Assume image layers A, B, C, D are all in the `layer_selection`.
//
// The created image layers contain whatever is needed from B, C, and from
// `----]` of A, and from `[---` of D.
//
// In contrast, `[---A` and `D----]` have not been processed, so, we must
// keep that data.
//
// The solution for now is to keep A and D completely if they are image layers.
// (layer_selection is what we'll remove from the layer map, so, retain what
// is _not_ fully covered by compaction_key_range).
for layer in &layer_selection {
if !layer.layer_desc().is_delta() {
if !overlaps_with(&layer.layer_desc().key_range, compaction_key_range) {
bail!("violated constraint: image layer outside of compaction key range");
}
if !fully_contains(compaction_key_range, &layer.layer_desc().key_range) {
keep_layers.insert(layer.layer_desc().key());
}
// Partial compaction might select more data than it processes, e.g., if
// the compaction_key_range only partially overlaps:
//
// [---compaction_key_range---]
// [---A----][----B----][----C----][----D----]
//
// For delta layers, we will rewrite the layers so that it is cut exactly at
// the compaction key range, so we can always discard them. However, for image
// layers, as we do not rewrite them for now, we need to handle them differently.
// Assume image layers A, B, C, D are all in the `layer_selection`.
//
// The created image layers contain whatever is needed from B, C, and from
// `----]` of A, and from `[---` of D.
//
// In contrast, `[---A` and `D----]` have not been processed, so, we must
// keep that data.
//
// The solution for now is to keep A and D completely if they are image layers.
// (layer_selection is what we'll remove from the layer map, so, retain what
// is _not_ fully covered by compaction_key_range).
for layer in &layer_selection {
if !layer.layer_desc().is_delta() {
if !overlaps_with(
&layer.layer_desc().key_range,
&job_desc.compaction_key_range,
) {
bail!("violated constraint: image layer outside of compaction key range");
}
if !fully_contains(
&job_desc.compaction_key_range,
&layer.layer_desc().key_range,
) {
keep_layers.insert(layer.layer_desc().key());
}
}
}
Expand Down

0 comments on commit eaa253c

Please sign in to comment.