Skip to content

Commit

Permalink
pageserver: coalesce index uploads when possible (#10248)
Browse files Browse the repository at this point in the history
## Problem

With upload queue reordering in #10218, we can easily get into a
situation where multiple index uploads are queued back to back, which
can't be parallelized. This will happen e.g. when multiple layer flushes
enqueue layer/index/layer/index/... and the layers skip the queue and
are uploaded in parallel.

These index uploads will incur serial S3 roundtrip latencies, and may
block later operations.

Touches #10096.

## Summary of changes

When multiple back-to-back index uploads are ready to upload, only
upload the most recent index and drop the rest.
  • Loading branch information
erikgrinaker authored Jan 14, 2025
1 parent e58e29e commit 6debb49
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 26 deletions.
1 change: 1 addition & 0 deletions pageserver/benches/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
task_id,
retries: AtomicU32::new(0),
op: delete.clone(),
coalesced_ops: Vec::new(),
}),
);
}
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,7 @@ impl RemoteTimelineClient {
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(mut next_op) = upload_queue.next_ready() {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

// Prepare upload.
Expand Down Expand Up @@ -1918,6 +1918,7 @@ impl RemoteTimelineClient {
let task = Arc::new(UploadTask {
task_id: upload_task_id,
op: next_op,
coalesced_ops,
retries: AtomicU32::new(0),
});
upload_queue
Expand Down Expand Up @@ -2285,6 +2286,9 @@ impl RemoteTimelineClient {
}

self.metric_end(&task.op);
for coalesced_op in &task.coalesced_ops {
self.metric_end(coalesced_op);
}
}

fn metric_impl(
Expand Down
86 changes: 61 additions & 25 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use tracing::info;
static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));

/// Kill switch for index upload coalescing in case it causes problems.
/// TODO: remove this once we have confidence in it.
static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));

// clippy warns that Uninitialized is much smaller than Initialized, which wastes
// memory for Uninitialized variants. Doesn't matter in practice, there are not
// that many upload queues in a running pageserver, and most of them are initialized
Expand Down Expand Up @@ -130,10 +135,12 @@ impl UploadQueueInitialized {
/// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
/// the queue if it doesn't conflict with operations ahead of it.
///
/// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
///
/// None may be returned even if the queue isn't empty, if no operations are ready yet.
///
/// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
pub fn next_ready(&mut self) -> Option<UploadOp> {
pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
// If inprogress_tasks is already at limit, don't schedule anything more.
if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
return None;
Expand All @@ -151,7 +158,36 @@ impl UploadQueueInitialized {
return None;
}

return self.queued_operations.remove(i);
let mut op = self.queued_operations.remove(i).expect("i can't disappear");

// Coalesce any back-to-back index uploads by only uploading the newest one that's
// ready. This typically happens with layer/index/layer/index/... sequences, where
// the layers bypass the indexes, leaving the indexes queued.
//
// If other operations are interleaved between index uploads we don't try to
// coalesce them, since we may as well update the index concurrently with them.
// This keeps the index fresh and avoids starvation.
//
// NB: we assume that all uploaded indexes have the same remote path. This
// is true at the time of writing: the path only depends on the tenant,
// timeline and generation, all of which are static for a timeline instance.
// Otherwise, we must be careful not to coalesce different paths.
let mut coalesced_ops = Vec::new();
if matches!(op, UploadOp::UploadMetadata { .. }) {
while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
{
if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
break;
}
if !self.is_ready(i) {
break;
}
coalesced_ops.push(op);
op = self.queued_operations.remove(i).expect("i can't disappear");
}
}

return Some((op, coalesced_ops));
}

// Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
Expand Down Expand Up @@ -225,11 +261,12 @@ impl UploadQueueInitialized {
fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
let mut tasks = Vec::new();
// NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
while let Some(op) = self.next_ready() {
while let Some((op, coalesced_ops)) = self.next_ready() {
self.task_counter += 1;
let task = Arc::new(UploadTask {
task_id: self.task_counter,
op,
coalesced_ops,
retries: 0.into(),
});
self.inprogress_tasks.insert(task.task_id, task.clone());
Expand Down Expand Up @@ -407,9 +444,13 @@ impl UploadQueue {
pub struct UploadTask {
/// Unique ID of this task. Used as the key in `inprogress_tasks` above.
pub task_id: u64,
/// Number of task retries.
pub retries: AtomicU32,

/// The upload operation.
pub op: UploadOp,
/// Any upload operations that were coalesced into this operation. This typically happens with
/// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
pub coalesced_ops: Vec<UploadOp>,
}

/// A deletion of some layers within the lifetime of a timeline. This is not used
Expand Down Expand Up @@ -512,9 +553,8 @@ impl UploadOp {
})
}

// Indexes can never bypass each other.
// TODO: we could coalesce them though, by only uploading the newest ready index. This
// is left for later, out of caution.
// Indexes can never bypass each other. They can coalesce though, and
// `UploadQueue::next_ready()` currently does this when possible.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}
Expand Down Expand Up @@ -897,9 +937,9 @@ mod tests {
Ok(())
}

/// Index uploads are serialized.
/// Index uploads are coalesced.
#[test]
fn schedule_index_serial() -> anyhow::Result<()> {
fn schedule_index_coalesce() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;

Expand All @@ -920,13 +960,11 @@ mod tests {

queue.queued_operations.extend(ops.clone());

// The uploads should run serially.
for op in ops {
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &op);
queue.complete(tasks[0].task_id);
}
// The index uploads are coalesced into a single operation.
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &ops[2]);
assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);

assert!(queue.queued_operations.is_empty());

Expand Down Expand Up @@ -985,18 +1023,14 @@ mod tests {
assert_same_op(&index_tasks[0].op, &ops[1]);
queue.complete(index_tasks[0].task_id);

// layer 1 completes. This unblocks index 1 then index 2.
// layer 1 completes. This unblocks index 1 and 2, which coalesce into
// a single upload for index 2.
queue.complete(upload_tasks[1].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[3]);
queue.complete(index_tasks[0].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[5]);
queue.complete(index_tasks[0].task_id);
assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);

assert!(queue.queued_operations.is_empty());

Expand All @@ -1018,11 +1052,12 @@ mod tests {
let index_deref = index_without(&index_upload, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer and delete it.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down Expand Up @@ -1063,11 +1098,12 @@ mod tests {
let index_ref = index_with(&index_deref, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down

1 comment on commit 6debb49

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7455 tests run: 7073 passed, 1 failed, 381 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_layer_map[release-pg16-github-actions-selfhosted]"
Flaky tests (1)

Postgres 16

Code coverage* (full report)

  • functions: 32.9% (8144 of 24779 functions)
  • lines: 48.1% (68254 of 141935 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
6debb49 at 2025-01-14T23:28:15.974Z :recycle:

Please sign in to comment.