Skip to content

Commit

Permalink
pageserver: limit number of upload queue tasks (#10384)
Browse files Browse the repository at this point in the history
## Problem

The upload queue can currently schedule an arbitrary number of tasks.
This can both spawn an unbounded number of Tokio tasks, and also
significantly slow down upload queue scheduling as it's quadratic in
number of operations.

Touches #10096.

## Summary of changes

Limit the number of inprogress tasks to the remote storage upload
concurrency. While this concurrency limit is shared across all tenants,
there's certainly no point in scheduling more than this -- we could even
consider setting the limit lower, but don't for now to avoid
artificially constraining tenants.
  • Loading branch information
erikgrinaker authored Jan 14, 2025
1 parent d36112d commit e58e29e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
11 changes: 11 additions & 0 deletions libs/remote_storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ impl RemoteStorageKind {
}
}

impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> Option<usize> {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
}
}
}

fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
Expand Down
2 changes: 1 addition & 1 deletion pageserver/benches/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {

// Construct the queue.
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&index)?;
let queue = queue.initialize_with_current_remote_index_part(&index, 0)?;

// Populate inprogress_tasks with a bunch of layer1 deletions.
let delete = UploadOp::Delete(Delete {
Expand Down
29 changes: 26 additions & 3 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,16 @@ impl RemoteTimelineClient {
/// an index file upload, i.e., it's not empty.
/// The given `index_part` must be the one on the remote.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
// Set the maximum number of inprogress tasks to the remote storage concurrency. There's
// certainly no point in starting more upload tasks than this.
let inprogress_limit = self
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
info!(
"initialized upload queue from remote index with {} layer files",
Expand All @@ -441,8 +449,16 @@ impl RemoteTimelineClient {
&self,
local_metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
// Set the maximum number of inprogress tasks to the remote storage concurrency. There's
// certainly no point in starting more upload tasks than this.
let inprogress_limit = self
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata)?;
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
info!("initialized upload queue as empty");
Ok(())
Expand All @@ -458,9 +474,15 @@ impl RemoteTimelineClient {
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
"bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
))?;
let inprogress_limit = self
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);

let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
self.stop_impl(&mut upload_queue);

Expand Down Expand Up @@ -2355,6 +2377,7 @@ impl RemoteTimelineClient {
// but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
// Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
let upload_queue_for_deletion = UploadQueueInitialized {
inprogress_limit: initialized.inprogress_limit,
task_counter: 0,
dirty: initialized.dirty.clone(),
clean: initialized.clean.clone(),
Expand Down
73 changes: 61 additions & 12 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub enum OpType {

/// This keeps track of queued and in-progress tasks.
pub struct UploadQueueInitialized {
/// Maximum number of inprogress tasks to schedule. 0 is no limit.
pub(crate) inprogress_limit: usize,

/// Counter to assign task IDs
pub(crate) task_counter: u64,

Expand Down Expand Up @@ -128,8 +131,14 @@ impl UploadQueueInitialized {
/// the queue if it doesn't conflict with operations ahead of it.
///
/// None may be returned even if the queue isn't empty, if no operations are ready yet.
///
/// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
pub fn next_ready(&mut self) -> Option<UploadOp> {
// NB: this is quadratic, but queues are expected to be small.
// If inprogress_tasks is already at limit, don't schedule anything more.
if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
return None;
}

for (i, candidate) in self.queued_operations.iter().enumerate() {
// If this candidate is ready, go for it. Otherwise, try the next one.
if self.is_ready(i) {
Expand Down Expand Up @@ -289,6 +298,7 @@ impl UploadQueue {
pub fn initialize_empty_remote(
&mut self,
metadata: &TimelineMetadata,
inprogress_limit: usize,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
Expand All @@ -302,6 +312,7 @@ impl UploadQueue {
let index_part = IndexPart::empty(metadata.clone());

let state = UploadQueueInitialized {
inprogress_limit,
dirty: index_part.clone(),
clean: (index_part, None),
latest_files_changes_since_metadata_upload_scheduled: 0,
Expand All @@ -325,6 +336,7 @@ impl UploadQueue {
pub fn initialize_with_current_remote_index_part(
&mut self,
index_part: &IndexPart,
inprogress_limit: usize,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
Expand All @@ -339,6 +351,7 @@ impl UploadQueue {
);

let state = UploadQueueInitialized {
inprogress_limit,
dirty: index_part.clone(),
clean: (index_part.clone(), None),
latest_files_changes_since_metadata_upload_scheduled: 0,
Expand Down Expand Up @@ -633,7 +646,7 @@ mod tests {
#[test]
fn schedule_barrier() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
let tli = make_timeline();

let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
Expand Down Expand Up @@ -700,7 +713,7 @@ mod tests {
#[test]
fn schedule_delete_parallel() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
let tli = make_timeline();

// Enqueue a bunch of deletes, some with conflicting names.
Expand Down Expand Up @@ -745,7 +758,7 @@ mod tests {
#[test]
fn schedule_upload_conflicts() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Enqueue three versions of the same layer, with different file sizes.
Expand Down Expand Up @@ -778,7 +791,7 @@ mod tests {
#[test]
fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Enqueue two layer uploads, with a delete of both layers in between them. These should be
Expand Down Expand Up @@ -817,7 +830,7 @@ mod tests {
#[test]
fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Enqueue two layer uploads, with a delete of both layers in between them. These should be
Expand Down Expand Up @@ -859,7 +872,7 @@ mod tests {
#[test]
fn schedule_upload_parallel() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Enqueue three different layer uploads.
Expand Down Expand Up @@ -888,7 +901,7 @@ mod tests {
#[test]
fn schedule_index_serial() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;

// Enqueue three uploads of the current empty index.
let index = Box::new(queue.clean.0.clone());
Expand Down Expand Up @@ -925,7 +938,7 @@ mod tests {
#[test]
fn schedule_index_upload_chain() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Enqueue three uploads of the current empty index.
Expand Down Expand Up @@ -994,7 +1007,7 @@ mod tests {
#[test]
fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Create a layer to upload.
Expand Down Expand Up @@ -1038,7 +1051,7 @@ mod tests {
#[test]
fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
let tli = make_timeline();

// Create a layer to upload.
Expand Down Expand Up @@ -1085,7 +1098,7 @@ mod tests {
#[test]
fn schedule_shutdown() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example())?;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
let tli = make_timeline();

let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
Expand Down Expand Up @@ -1139,6 +1152,42 @@ mod tests {
Ok(())
}

/// Scheduling respects inprogress_limit.
#[test]
fn schedule_inprogress_limit() -> anyhow::Result<()> {
// Create a queue with inprogress_limit=2.
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
let tli = make_timeline();

// Enqueue a bunch of uploads.
let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");

let ops = [
UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
];

queue.queued_operations.extend(ops.clone());

// Schedule all ready operations. Only 2 are scheduled.
let tasks = queue.schedule_ready();
assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
assert!(queue.next_ready().is_none());

// When one completes, another is scheduled.
queue.complete(tasks[0].task_id);
let tasks = queue.schedule_ready();
assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);

Ok(())
}

/// Tests that can_bypass takes name, generation and shard index into account for all operations.
#[test]
fn can_bypass_path() -> anyhow::Result<()> {
Expand Down

1 comment on commit e58e29e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7304 tests run: 6933 passed, 1 failed, 370 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_scrubber_tenant_snapshot[debug-pg17-4]"
Flaky tests (1)

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
e58e29e at 2025-01-14T20:03:35.317Z :recycle:

Please sign in to comment.